# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""A miscellaneous collection of basic functions."""
from __future__ import (absolute_import, unicode_literals, division,
print_function)
import numpy as np
import logging
import sys
[docs]def r_in(td, r_0):
"""Calculate incident countrate given dead time and detected countrate."""
tau = 1 / r_0
return 1. / (tau - td)
[docs]def r_det(td, r_in):
"""Calculate detected countrate given dead time and incident countrate."""
tau = 1 / r_in
return 1. / (tau + td)
def _assign_value_if_none(value, default):
if value is None:
return default
else:
return value
def _look_for_array_in_array(array1, array2):
for a1 in array1:
if a1 in array2:
return a1
[docs]def is_string(s):
"""Portable function to answer this question."""
PY3 = sys.version_info[0] == 3
if PY3:
return isinstance(s, str) # NOQA
else:
return isinstance(s, basestring) # NOQA
def _order_list_of_arrays(data, order):
if hasattr(data, 'items'):
data = dict([(i[0], i[1][order])
for i in data.items()])
elif hasattr(data, 'index'):
data = [i[order] for i in data]
else:
data = None
return data
class _empty():
def __init__(self):
pass
[docs]def mkdir_p(path): # pragma: no cover
"""Safe mkdir function.
Parameters
----------
path : str
Name of the directory/ies to create
Notes
-----
Found at
http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
"""
import os
import errno
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
[docs]def ref_mjd(fits_file, hdu=1):
"""Read MJDREFF+ MJDREFI or, if failed, MJDREF, from the FITS header.
Parameters
----------
fits_file : str
Returns
-------
mjdref : numpy.longdouble
the reference MJD
Other Parameters
----------------
hdu : int
"""
import collections
if isinstance(fits_file, collections.Iterable) and\
not is_string(fits_file): # pragma: no cover
fits_file = fits_file[0]
logging.info("opening %s" % fits_file)
try:
ref_mjd_int = np.long(read_header_key(fits_file, 'MJDREFI'))
ref_mjd_float = np.longdouble(read_header_key(fits_file, 'MJDREFF'))
ref_mjd_val = ref_mjd_int + ref_mjd_float
except: # pragma: no cover
ref_mjd_val = np.longdouble(read_header_key(fits_file, 'MJDREF'))
return ref_mjd_val
[docs]def common_name(str1, str2, default='common'):
"""Strip two strings of the letters not in common.
Filenames must be of same length and only differ by a few letters.
Parameters
----------
str1 : str
str2 : str
Returns
-------
common_str : str
A string containing the parts of the two names in common
Other Parameters
----------------
default : str
The string to return if common_str is empty
"""
if not len(str1) == len(str2):
return default
common_str = ''
# Extract the MP root of the name (in case they're event files)
str1 = mp_root(str1)
str2 = mp_root(str2)
for i, letter in enumerate(str1):
if str2[i] == letter:
common_str += letter
# Remove leading and trailing underscores and dashes
common_str = common_str.rstrip('_').rstrip('-')
common_str = common_str.lstrip('_').lstrip('-')
if common_str == '':
common_str = default
logging.debug('common_name: %s %s -> %s' % (str1, str2, common_str))
return common_str
[docs]def mp_root(filename):
"""Return the root file name (without _ev, _lc, etc.).
Parameters
----------
filename : str
"""
import os.path
fname = filename.replace('.gz', '')
fname = os.path.splitext(filename)[0]
fname = fname.replace('_ev', '').replace('_lc', '')
fname = fname.replace('_calib', '')
return fname
[docs]def contiguous_regions(condition):
"""Find contiguous True regions of the boolean array "condition".
Return a 2D array where the first column is the start index of the region
and the second column is the end index.
Parameters
----------
condition : boolean array
Returns
-------
idx : [[i0_0, i0_1], [i1_0, i1_1], ...]
A list of integer couples, with the start and end of each True blocks
in the original array
Notes
-----
From http://stackoverflow.com/questions/4494404/find-large-number-of-consecutive-values-fulfilling-condition-in-a-numpy-array
""" # NOQA
# Find the indicies of changes in "condition"
diff = np.diff(condition)
idx, = diff.nonzero()
# We need to start things after the change in "condition". Therefore,
# we'll shift the index by 1 to the right.
idx += 1
if condition[0]:
# If the start of condition is True prepend a 0
idx = np.r_[0, idx]
if condition[-1]:
# If the end of condition is True, append the length of the array
idx = np.r_[idx, condition.size]
# Reshape the result into two columns
idx.shape = (-1, 2)
return idx
[docs]def check_gtis(gti):
"""Check if GTIs are well-behaved. No start>end, no overlaps.
Raises
------
AssertionError
If GTIs are not well-behaved.
"""
gti_start = gti[:, 0]
gti_end = gti[:, 1]
logging.debug('-- GTI: ' + repr(gti))
# Check that GTIs are well-behaved
assert np.all(gti_end >= gti_start), 'This GTI is incorrect'
# Check that there are no overlaps in GTIs
assert np.all(gti_start[1:] >= gti_end[:-1]), 'This GTI has overlaps'
logging.debug('-- Correct')
return
[docs]def create_gti_mask(time, gtis, safe_interval=0, min_length=0,
return_new_gtis=False, dt=None):
"""Create GTI mask.
Assumes that no overlaps are present between GTIs
Parameters
----------
time : float array
gtis : [[g0_0, g0_1], [g1_0, g1_1], ...], float array-like
Returns
-------
mask : boolean array
new_gtis : Nx2 array
Other parameters
----------------
safe_interval : float or [float, float]
A safe interval to exclude at both ends (if single float) or the start
and the end (if pair of values) of GTIs.
min_length : float
return_new_gtis : bool
dt : float
"""
import collections
check_gtis(gtis)
dt = _assign_value_if_none(dt,
np.zeros_like(time) + (time[1] - time[0]) / 2)
mask = np.zeros(len(time), dtype=bool)
if not isinstance(safe_interval, collections.Iterable):
safe_interval = [safe_interval, safe_interval]
newgtis = np.zeros_like(gtis)
# Whose GTIs, including safe intervals, are longer than min_length
newgtimask = np.zeros(len(newgtis), dtype=np.bool)
for ig, gti in enumerate(gtis):
limmin, limmax = gti
limmin += safe_interval[0]
limmax -= safe_interval[1]
if limmax - limmin >= min_length:
newgtis[ig][:] = [limmin, limmax]
cond1 = time - dt >= limmin
cond2 = time + dt <= limmax
good = np.logical_and(cond1, cond2)
mask[good] = True
newgtimask[ig] = True
res = mask
if return_new_gtis:
res = [res, newgtis[newgtimask]]
return res
[docs]def create_gti_from_condition(time, condition,
safe_interval=0, dt=None):
"""Create a GTI list from a time array and a boolean mask ("condition").
Parameters
----------
time : array-like
Array containing times
condition : array-like
An array of bools, of the same length of time.
A possible condition can be, e.g., the result of lc > 0.
Returns
-------
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
The newly created GTIs
Other parameters
----------------
safe_interval : float or [float, float]
A safe interval to exclude at both ends (if single float) or the start
and the end (if pair of values) of GTIs.
dt : float
The width (in sec) of each bin of the time array. Can be irregular.
"""
import collections
assert len(time) == len(condition), \
'The length of the condition and time arrays must be the same.'
idxs = contiguous_regions(condition)
if not isinstance(safe_interval, collections.Iterable):
safe_interval = [safe_interval, safe_interval]
dt = _assign_value_if_none(dt,
np.zeros_like(time) + (time[1] - time[0]) / 2)
gtis = []
for idx in idxs:
logging.debug(idx)
startidx = idx[0]
stopidx = idx[1]-1
t0 = time[startidx] - dt[startidx] + safe_interval[0]
t1 = time[stopidx] + dt[stopidx] - safe_interval[1]
if t1 - t0 < 0:
continue
gtis.append([t0, t1])
return np.array(gtis)
[docs]def cross_two_gtis(gti0, gti1):
"""Extract the common intervals from two GTI lists *EXACTLY*.
Parameters
----------
gti0 : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
gti1 : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
Returns
-------
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
The newly created GTIs
See Also
--------
cross_gtis : From multiple GTI lists, extract common intervals *EXACTLY*
"""
gti0 = np.array(gti0, dtype=np.longdouble)
gti1 = np.array(gti1, dtype=np.longdouble)
# Check GTIs
check_gtis(gti0)
check_gtis(gti1)
gti0_start = gti0[:, 0]
gti0_end = gti0[:, 1]
gti1_start = gti1[:, 0]
gti1_end = gti1[:, 1]
# Create a list that references to the two start and end series
gti_start = [gti0_start, gti1_start]
gti_end = [gti0_end, gti1_end]
# Concatenate the series, while keeping track of the correct origin of
# each start and end time
gti0_tag = np.array([0 for g in gti0_start], dtype=bool)
gti1_tag = np.array([1 for g in gti1_start], dtype=bool)
conc_start = np.concatenate((gti0_start, gti1_start))
conc_end = np.concatenate((gti0_end, gti1_end))
conc_tag = np.concatenate((gti0_tag, gti1_tag))
# Put in time order
order = np.argsort(conc_end)
conc_start = conc_start[order]
conc_end = conc_end[order]
conc_tag = conc_tag[order]
last_end = conc_start[0] - 1
final_gti = []
for ie, e in enumerate(conc_end):
# Is this ending in series 0 or 1?
this_series = conc_tag[ie]
other_series = not this_series
# Check that this closes intervals in both series.
# 1. Check that there is an opening in both series 0 and 1 lower than e
try:
st_pos = \
np.argmax(gti_start[this_series][gti_start[this_series] < e])
so_pos = \
np.argmax(gti_start[other_series][gti_start[other_series] < e])
st = gti_start[this_series][st_pos]
so = gti_start[other_series][so_pos]
s = max([st, so])
except: # pragma: no cover
continue
# If this start is inside the last interval (It can happen for equal
# GTI start times between the two series), then skip!
if s <= last_end:
continue
# 2. Check that there is no closing before e in the "other series",
# from intervals starting either after s, or starting and ending
# between the last closed interval and this one
cond1 = (gti_end[other_series] > s) * (gti_end[other_series] < e)
cond2 = gti_end[other_series][so_pos] < s
condition = np.any(np.logical_or(cond1, cond2))
# Well, if none of the conditions at point 2 apply, then you can
# create the new gti!
if not condition:
final_gti.append([s, e])
last_end = e
return np.array(final_gti, dtype=np.longdouble)
[docs]def cross_gtis(gti_list):
"""From multiple GTI lists, extract the common intervals *EXACTLY*.
Parameters
----------
gti_list : array-like
List of GTI arrays, each one in the usual format [[gti0_0, gti0_1],
[gti1_0, gti1_1], ...]
Returns
-------
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
The newly created GTIs
See Also
--------
cross_two_gtis : Extract the common intervals from two GTI lists *EXACTLY*
"""
ninst = len(gti_list)
if ninst == 1:
return gti_list[0]
gti0 = gti_list[0]
for gti in gti_list[1:]:
gti0 = cross_two_gtis(gti0, gti)
return gti0
[docs]def get_btis(gtis, start_time=None, stop_time=None):
"""From GTIs, obtain bad time intervals.
GTIs have to be well-behaved, in the sense that they have to pass
`check_gtis`.
"""
# Check GTIs
if len(gtis) == 0:
assert start_time is not None and stop_time is not None, \
'Empty GTI and no valid start_time and stop_time. BAD!'
return np.array([[start_time, stop_time]], dtype=np.longdouble)
check_gtis(gtis)
start_time = _assign_value_if_none(start_time, gtis[0][0])
stop_time = _assign_value_if_none(stop_time, gtis[-1][1])
if gtis[0][0] - start_time <= 0:
btis = []
else:
btis = [[gtis[0][0] - start_time]]
# Transform GTI list in
flat_gtis = gtis.flatten()
new_flat_btis = zip(flat_gtis[1:-2:2], flat_gtis[2:-1:2])
btis.extend(new_flat_btis)
if stop_time - gtis[-1][1] > 0:
btis.extend([[gtis[0][0] - stop_time]])
return np.array(btis, dtype=np.longdouble)
[docs]def optimal_bin_time(fftlen, tbin):
"""Vary slightly the bin time to have a power of two number of bins.
Given an FFT length and a proposed bin time, return a bin time
slightly shorter than the original, that will produce a power-of-two number
of FFT bins.
"""
import numpy as np
return fftlen / (2 ** np.ceil(np.log2(fftlen / tbin)))
[docs]def detection_level(nbins, epsilon=0.01, n_summed_spectra=1, n_rebin=1):
r"""Detection level for a PDS.
Return the detection level (with probability 1 - epsilon) for a Power
Density Spectrum of nbins bins, normalized a la Leahy (1983), based on
the 2-dof :math:`{\chi}^2` statistics, corrected for rebinning (n_rebin)
and multiple PDS averaging (n_summed_spectra)
"""
try:
from scipy import stats
except: # pragma: no cover
raise Exception('You need Scipy to use this function')
import collections
if not isinstance(n_rebin, collections.Iterable):
r = n_rebin
retlev = stats.chi2.isf(epsilon / nbins, 2 * n_summed_spectra * r) \
/ (n_summed_spectra * r)
else:
retlev = [stats.chi2.isf(epsilon / nbins, 2 * n_summed_spectra * r) /
(n_summed_spectra * r) for r in n_rebin]
retlev = np.array(retlev)
return retlev
[docs]def probability_of_power(level, nbins, n_summed_spectra=1, n_rebin=1):
r"""Give the probability of a given power level in PDS.
Return the probability of a certain power level in a Power Density
Spectrum of nbins bins, normalized a la Leahy (1983), based on
the 2-dof :math:`{\chi}^2` statistics, corrected for rebinning (n_rebin)
and multiple PDS averaging (n_summed_spectra)
"""
try:
from scipy import stats
except: # pragma: no cover
raise Exception('You need Scipy to use this function')
epsilon = nbins * stats.chi2.sf(level * n_summed_spectra * n_rebin,
2 * n_summed_spectra * n_rebin)
return 1 - epsilon
[docs]def calc_countrate(time, lc, gtis=None, bintime=None):
"""Calculate the count rate from a light curve.
Parameters
----------
time : array-like
lc : array-like
Returns
-------
countrate : float
The mean count rate
Other Parameters
----------------
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
bintime : float
The bin time of the light curve. If not specified, the minimum
difference between time bins is used
"""
bintime = _assign_value_if_none(bintime, np.min(np.diff(time)))
if gtis is not None:
mask = create_gti_mask(time, gtis)
lc = lc[mask]
return np.mean(lc) / bintime
[docs]def gti_len(gti):
"""Return the total good time from a list of GTIs."""
return np.sum([g[1] - g[0] for g in gti])