"""Object profiles contains commonly used profiles of different types of source
objects such as PointSource, Disk etc...
"""
import warnings
import copy
import os
import numpy as np
from scipy import stats
import cv2
from lfd.analysis.profiles.convolutionobj import ConvolutionObject
from lfd.analysis.profiles.consts import *
import lfd.analysis.profiles.utils as utils
__all__ = ["PointSource", "GaussianSource", "DiskSource", "RabinaSource", "exp_fwhms"]
[docs]class PointSource(ConvolutionObject):
"""Simple point like source. Point-like sources are not resolved, therefore
regardless of scale and size only a single element of obj will have a value
Parameters
----------
h : float
height of the object, in meters
res : float
desired resolution in arcseconds. The scale step (the difference between
two neighbouring "x-axis" points) is then determined by scaling the
appropriate angular size of the object by the resolution, so that the
object remains unresolved at the required resolution.
"""
def __init__(self, h, res=0.001, **kwargs):
self.h = h
theta = 1.0/(h*1000.) * RAD2ARCSEC
scale = np.arange(-theta, theta, theta*res)
obj = self.f(scale)
ConvolutionObject.__init__(self, obj, scale)
[docs] def f(self, r):
"""Returns the intensity value of the object at a point. Evaluating a
point source over only handfull of points is not well defined. The
function may not behave properly if number of points is very small,
i.e. 2 or 3 points only.
"""
if hasattr(r, "__iter__"):
# if PointSource is evaluated over an array with finer grid than
# scale and we calculate the values we would get max value for all
# points closer than current scale step - this wouldn't be a point
# so instead a new array of same shape is returned with only 1
# element at max - a point source. We make a check if r is ndarray
# or just a list or tuple
try:
obj = np.zeros(r.shape)
except AttributeError:
obj = np.zeros(len(r))
obj[int(len(obj)/2)] += 1.0
return obj
else:
# in the case where we evaluate at a singular point
# if the position is closer to the location of the peak than the
# resolution of our scale - we are looking at the source, otherwise
# we are looking somewhere else
peak = np.where(self.obj == self.obj.max())[0][0]
if (r - self.scale[peak]) <= self.step:
# standardize the output format to numpy array even in case of
# a single number
return np.array([self.obj.max()])
else:
return np.array([0.0])
[docs]class GaussianSource(ConvolutionObject):
"""Simple gaussian intensity profile.
Parameters
----------
h : float
height of the object, in meters
fwhm : float
FWHM of the gaussian profile
res : float
desired resolution, in arcseconds
units : string
spatial units (meters by default) - currently not very well supported
"""
def __init__(self, h, fwhm, res=0.001, units="meters", **kwargs):
self.h = h
self.theta = fwhm/(h*1000.)*RAD2ARCSEC
self.sigma = fwhm/2.355
self._f = stats.norm(scale=fwhm/2.355).pdf
scale = np.arange(-1.7*self.theta, 1.7*self.theta, self.theta*res)
obj = self.f(scale)
ConvolutionObject.__init__(self, obj, scale)
[docs] def f(self, r):
"""Evaluate the gaussian at a point."""
# standardize the output format to numpy array even in case of a number
if any((isinstance(r, int), isinstance(r, float),
isinstance(r, complex))):
rr = np.array([r], dtype=float)
else:
rr = np.array(r, dtype=float)
return self._f(rr)
[docs]class DiskSource(ConvolutionObject):
"""Brightness profile of a disk-like source.
Parameters
----------
h : float
height of the object, in meters
radius : float
radius of the objects disk, in meters
res : float
desired resolution, in arcseconds
"""
def __init__(self, h, radius, res=0.001, **kwargs):
self.h = h
self.r = radius
self.theta = radius/(2*h*1000.) * RAD2ARCSEC
scale = np.arange(-2*self.theta, 2*self.theta, self.theta*res)
obj = self.f(scale)
ConvolutionObject.__init__(self, obj, scale)
[docs] def f(self, r, units="arcsec"):
"""Returns the brightness value of the object at a point. By default
the units of the scale are arcseconds but radians are also accepted.
"""
# 99% (578x) speedup was achieved refactoring this code - Jan 2018
# standardize the output format to numpy array even in case of a number
if any((isinstance(r, int), isinstance(r, float),
isinstance(r, complex))):
rr = np.array([r], dtype=float)
else:
rr = np.array(r, dtype=float)
if units.upper() == "RAD":
rr = rr*RAD2ARCSEC
theta = self.theta
_f = lambda x: 2*np.sqrt(theta**2-x**2)/(np.pi*theta**2)
# testing showed faster abs performs faster for smaller arrays and
# logical_or outperforms abs by 12% for larger ones
if len(rr) < 50000:
mask = np.abs(rr)>=theta
else:
mask = np.logical_or(rr>theta, rr<-theta)
rr[mask] = 0
rr[~mask] = _f(rr[~mask])
return rr
[docs] def width(self):
"""FWHM is not a good metric for measuring the end size of the object
because disk-like profiles do not taper off towards the top. Instead
width of the object (difference between first and last point with
brightness above zero) is a more appropriate measure of the size of the
object.
"""
left = self.scale[np.where(self.obj > 0)[0][0]]
right = self.scale[np.where(self.obj > 0)[0][-1]]
return right-left
[docs]class RabinaSource(ConvolutionObject):
"""::
Bektesevic & Vinkovic et. al. 2017 (arxiv: 1707.07223) Eq. (9)
a 1D integrated projection of fiducial 3D meteor head model as given by::
Rabina J., et al. 2016, J. Quant. Spectrosc. Radiat. Transf,178, 295
The integration of this profile is complicated and depends on variety of
parameters, such as the angle of the observer linesight and meteor
direction of travel. It is not practical to preform the integration every
time brightness value needs to be estimated (too slow).
A set of pregenerated projections of this profile to a plane were created
where the angle between meteor direction of travel and observer linesight
varies in the range from 0-1.5 radians (0-86 degrees) which are then used
to produce this 1D profile. The 1D profile is created from a crosssection
perpendicular to the meteors direction of travel.
These 2D profiles are avilable as images in the rabina directory along with
the required code to generate them.
The default profile is then scaled appropriately to the desired height and
any missing brightness values are then interpolated between the points.
Parameters
----------
angle : `string`, `float` or `int`
If string the path to the premade 2D integrated Rabina profile projection,
if float or int the angle in radians that will be converted into a filename
of one of the premade Rabina profiles.
h : `float`
height of the object, in meters
Note
----
Integrating 3D Rabina profile to create the 2D projection is very costly.
Premade profiles range from 0 to 1.5 radians in steps of 0.1 radians, or
approximately 0 to 90 degrees in steps of ~6 degrees. If a specific projection
angle is required code to generate one can be found in `profiles/rabina` dir
(see also: profiles.utils.get_rabina_dir).
"""
xmin, xmax = -5., 5.
ymin, ymax = -5., 5.
zmin, zmax = -5., 5.
N = 1000.
def __init__(self, h, angle, **kwargs):
self.projection = angle
self.h = h
try:
imgneg = utils.get_rabina_profile(angle, useCV2=True)
except ValueError:
if not os.path.isfile(self.projection):
raise ValueError(f"Expected path to file or an angle got {projection} instead.")
imgneg = cv2.imread(self.projection, cv2.IMREAD_GRAYSCALE)
if imgneg is None:
raise RuntimeError(f"OpenCV underscriptively failed to open {imgpath}. "
"Try to open it manually.")
#xmin, xmax = RabinaProfile.xmin, RabinaProfile.xmax
#ymin, ymax = RabinaSource.ymin, RabinaSource.ymax
#xstep = (xmax-xmin)/RabinaProfile.N
ystep = (RabinaSource.ymax- RabinaSource.ymin)/RabinaSource.N
white = 255.*np.ones(imgneg.shape)
img = -1.*(imgneg-white)
imgr = np.zeros(img.shape)
imgr[2:-2, 2:-2] += img[2:-2, 2:-2]
self.raw = imgr/imgr.max()
#meterscalex = np.arange(xmin, xmax, xstep)
meterscaley = np.arange( RabinaSource.ymin, RabinaSource.ymax, ystep)
#scalex = meterscalex/(h*1000.) * RAD2ARCSEC
self.scaley = meterscaley/(h*1000.) * RAD2ARCSEC
#self.distx = imgrn.sum(axis=0)
disty = self.raw.sum(axis=1)
ConvolutionObject.__init__(self, disty, self.scaley)
[docs] def f(self, r, units="arcsec"):
"""Returns the brightness value at a desired point."""
# 10x aster than previous impementation - Jan 2018
# standardize the output format
if any((isinstance(r, int), isinstance(r, float),
isinstance(r, complex))):
rr = np.array([r], dtype=float)
else:
rr = np.array(r, dtype=float)
if units.upper() == "RAD":
rr = rr*RAD2ARCSEC
# testing showed faster abs performs faster for smaller arrays and
# logical_or outperforms abs by 12% for larger ones
if len(rr) < 50000:
mask = np.abs(rr)>=self.scaley[-1]
else:
mask = np.logical_or(rr>self.scaley[-1], rr<-self.scaley[-1])
rr[mask] = 0
# RabinaSource will always raise interpolation warning, do not silence
# the longer interpolation warning.
with warnings.catch_warnings():
warnings.filterwarnings("ignore",
message="Required value estimated by interpolation.")
rr[~mask] = self._guessf(rr[~mask])
return rr
[docs]def exp_fwhms(tau, n, duration):
"""Generates series of n exponentially smaller FWHM values depending on the
scale time tau for the desired temporal duration.
Parameters
----------
tau : float
scale time of the decay
n : int
number of generated FWHM values
duration : float
total duration in which n FWHMs will be equaly spaced
"""
times = np.linspace(0, duration, n)
fwhms = np.exp(times/tau)
return fwhms