# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Functions to calculate frequency spectra."""
from __future__ import (absolute_import, unicode_literals, division,
print_function)
from .base import mp_root, cross_gtis, create_gti_mask
from .base import common_name, _empty, _assign_value_if_none
from .rebin import const_rebin
from .io import sort_files, get_file_type, load_lcurve, save_pds
from .io import MP_FILE_EXTENSION
import numpy as np
import logging
import warnings
from multiprocessing import Pool
import os
def _wrap_fun_cpds(arglist):
f1, f2, outname, kwargs = arglist
try:
return calc_cpds(f1, f2, outname=outname, **kwargs)
except Exception as e:
warnings.warn(str(e))
def _wrap_fun_pds(argdict):
fname = argdict["fname"]
argdict.pop("fname")
try:
return calc_pds(fname, **argdict)
except Exception as e:
warnings.warn(str(e))
[docs]def fft(lc, bintime):
"""A wrapper for the fft function. Just numpy for now.
Parameters
----------
lc : array-like
bintime : float
Returns
-------
freq : array-like
ft : array-like
the Fourier transform.
"""
nbin = len(lc)
ft = np.fft.fft(lc)
freqs = np.fft.fftfreq(nbin, bintime)
return freqs.astype(np.double), ft
[docs]def leahy_pds(lc, bintime):
r"""Calculate the power density spectrum.
Calculates the Power Density Spectrum a la Leahy+1983, ApJ 266, 160,
given the lightcurve and its bin time.
Assumes no gaps are present! Beware!
Parameters
----------
lc : array-like
the light curve
bintime : array-like
the bin time of the light curve
Returns
-------
freqs : array-like
Frequencies corresponding to PDS
pds : array-like
The power density spectrum
"""
nph = sum(lc)
# Checks must be done before. At this point, only good light curves have to
# be provided
assert (nph > 0), 'Invalid interval. Light curve is empty'
freqs, ft = fft(lc, bintime)
# I'm pretty sure there is a faster way to do this.
pds = np.absolute(ft.conjugate() * ft) * 2. / nph
good = freqs >= 0
freqs = freqs[good]
pds = pds[good]
return freqs, pds
[docs]def welch_pds(time, lc, bintime, fftlen, gti=None, return_all=False):
r"""Calculate the PDS, averaged over equal chunks of data.
Calculates the Power Density Spectrum \'a la Leahy (1983), given the
lightcurve and its bin time, over equal chunks of length fftlen, and
returns the average of all PDSs, or the sum PDS and the number of chunks
Parameters
----------
time : array-like
Central times of light curve bins
lc : array-like
Light curve
bintime : float
Bin time of the light curve
fftlen : float
Length of each FFT
gti : [[g0_0, g0_1], [g1_0, g1_1], ...]
Good time intervals. Defaults to
[[time[0] - bintime/2, time[-1] + bintime/2]]
Returns
-------
return_str : object, optional
An Object containing all values below.
f : array-like
array of frequencies corresponding to PDS bins
pds : array-like
the values of the PDS
epds : array-like
the values of the PDS
npds : int
the number of summed PDSs (if normalize is False)
ctrate : float
the average count rate in the two lcs
dynpds : array-like, optional
dynepds : array-like, optional
dynctrate : array-like, optional
times : array-like, optional
Other parameters
----------------
return_all : bool
if True, return everything, including the dynamical PDS
"""
gti = _assign_value_if_none(
gti, [[time[0] - bintime / 2, time[-1] + bintime / 2]])
start_bins, stop_bins = \
decide_spectrum_lc_intervals(gti, fftlen, time)
results = _empty()
if return_all:
results.dynpds = []
results.edynpds = []
results.dynctrate = []
results.times = []
pds = 0
npds = len(start_bins)
mask = np.zeros(len(lc), dtype=np.bool)
for start_bin, stop_bin in zip(start_bins, stop_bins):
l = lc[start_bin:stop_bin]
t0 = time[start_bin]
try:
assert np.sum(l) != 0, \
'Interval starting at time %.7f is bad. Check GTIs' % t0
f, p = leahy_pds(l, bintime)
except Exception as e:
warnings.warn(str(e))
npds -= 1
continue
if return_all:
results.dynpds.append(p)
results.edynpds.append(p)
results.dynctrate.append(np.mean(l) / bintime)
results.times.append(time[start_bin])
pds += p
mask[start_bin:stop_bin] = True
pds /= npds
epds = pds / np.sqrt(npds)
ctrate = np.mean(lc[mask]) / bintime
results.f = f
results.pds = pds
results.epds = epds
results.npds = npds
results.ctrate = ctrate
return results
[docs]def leahy_cpds(lc1, lc2, bintime):
"""Calculate the cross power density spectrum.
Calculates the Cross Power Density Spectrum, normalized similarly to the
PDS in Leahy+1983, ApJ 266, 160., given the lightcurve and its bin time.
Assumes no gaps are present! Beware!
Parameters
----------
lc1 : array-like
The first light curve
lc2 : array-like
The light curve
bintime : array-like
The bin time of the light curve
Returns
-------
freqs : array-like
Frequencies corresponding to PDS
cpds : array-like
The cross power density spectrum
cpdse : array-like
The error on the cross power density spectrum
pds1 : array-like
The power density spectrum of the first light curve
pds2 : array-like
The power density spectrum of the second light curve
"""
assert len(lc1) == len(lc2), 'Light curves MUST have the same length!'
nph1 = sum(lc1)
nph2 = sum(lc2)
# Checks must be done before. At this point, only good light curves have to
# be provided
assert (nph1 > 0 and nph2 > 0), ('Invalid interval. At least one light '
'curve is empty')
freqs, ft1 = fft(lc1, bintime)
freqs, ft2 = fft(lc2, bintime)
pds1 = np.absolute(ft1.conjugate() * ft1) * 2. / nph1
pds2 = np.absolute(ft2.conjugate() * ft2) * 2. / nph2
pds1e = np.copy(pds1)
pds2e = np.copy(pds2)
# The "effective" count rate is the geometrical mean of the count rates
# of the two light curves
nph = np.sqrt(nph1 * nph2)
# I'm pretty sure there is a faster way to do this.
if nph != 0:
cpds = ft1.conjugate() * ft2 * 2. / nph
else:
cpds = np.zeros(len(freqs))
# Justification in timing paper! (Bachetti et al. arXiv:1409.3248)
# This only works for cospectrum. For the cross spectrum, I *think*
# it's irrelevant
cpdse = np.sqrt(pds1e * pds2e) / np.sqrt(2.)
good = freqs >= 0
freqs = freqs[good]
cpds = cpds[good]
cpdse = cpdse[good]
return freqs, cpds, cpdse, pds1, pds2
[docs]def welch_cpds(time, lc1, lc2, bintime, fftlen, gti=None, return_all=False):
"""Calculate the CPDS, averaged over equal chunks of data.
Calculates the Cross Power Density Spectrum normalized like PDS, given the
lightcurve and its bin time, over equal chunks of length fftlen, and
returns the average of all PDSs, or the sum PDS and the number of chunks
Parameters
----------
time : array-like
Central times of light curve bins
lc1 : array-like
Light curve 1
lc2 : array-like
Light curve 2
bintime : float
Bin time of the light curve
fftlen : float
Length of each FFT
gti : [[g0_0, g0_1], [g1_0, g1_1], ...]
Good time intervals. Defaults to
[[time[0] - bintime/2, time[-1] + bintime/2]]
Returns
-------
return_str : object
An Object containing all return values below
f : array-like
array of frequencies corresponding to PDS bins
cpds : array-like
the values of the PDS
ecpds : array-like
the values of the PDS
ncpds : int
the number of summed PDSs (if normalize is False)
ctrate : float
the average count rate in the two lcs
dyncpds : array-like, optional
dynecpds : array-like, optional
dynctrate : array-like, optional
times : array-like, optional
Other parameters
----------------
return_all : bool
if True, return everything, including the dynamical PDS
"""
gti = _assign_value_if_none(
gti, [[time[0] - bintime / 2, time[-1] + bintime / 2]])
start_bins, stop_bins = \
decide_spectrum_lc_intervals(gti, fftlen, time)
cpds = 0
ecpds = 0
npds = len(start_bins)
mask = np.zeros(len(lc1), dtype=np.bool)
results = _empty()
if return_all:
results.dyncpds = []
results.edyncpds = []
results.dynctrate = []
results.times = []
cpds = 0
ecpds = 0
npds = len(start_bins)
for start_bin, stop_bin in zip(start_bins, stop_bins):
l1 = lc1[start_bin:stop_bin]
l2 = lc2[start_bin:stop_bin]
t0 = time[start_bin]
try:
assert np.sum(l1) != 0 and np.sum(l2) != 0, \
'Interval starting at time %.7f is bad. Check GTIs' % t0
f, p, pe, p1, p2 = leahy_cpds(l1, l2, bintime)
except Exception as e:
warnings.warn(str(e))
npds -= 1
continue
cpds += p
ecpds += pe ** 2
if return_all:
results.dyncpds.append(p)
results.edyncpds.append(pe)
results.dynctrate.append(
np.sqrt(np.mean(l1)*np.mean(l2)) / bintime)
results.times.append(time[start_bin])
mask[start_bin:stop_bin] = True
cpds /= npds
ecpds = np.sqrt(ecpds) / npds
ctrate = np.sqrt(np.mean(lc1[mask])*np.mean(lc2[mask])) / bintime
results.f = f
results.cpds = cpds
results.ecpds = ecpds
results.ncpds = npds
results.ctrate = ctrate
return results
[docs]def rms_normalize_pds(pds, pds_err, source_ctrate, back_ctrate=None):
"""Normalize a Leahy PDS with RMS normalization ([1]_, [2]_).
Parameters
----------
pds : array-like
The Leahy-normalized PDS
pds_err : array-like
The uncertainties on the PDS values
source_ctrate : float
The source count rate
back_ctrate: float, optional
The background count rate
Returns
-------
pds : array-like
the RMS-normalized PDS
pds_err : array-like
the uncertainties on the PDS values
References
----------
.. [1] Belloni & Hasinger 1990, A&A, 230, 103
.. [2] Miyamoto+1991, ApJ, 383, 784
"""
back_ctrate = _assign_value_if_none(back_ctrate, 0)
factor = (source_ctrate + back_ctrate) / source_ctrate ** 2
return pds * factor, pds_err * factor
[docs]def decide_spectrum_intervals(gtis, fftlen):
"""Decide the start times of PDSs.
Start each FFT/PDS/cospectrum from the start of a GTI, and stop before the
next gap.
Only use for events! This will give problems with binned light curves.
Parameters
----------
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
fftlen : float
Length of the chunks
Returns
-------
spectrum_start_times : array-like
List of starting times to use in the spectral calculations.
"""
spectrum_start_times = np.array([], dtype=np.longdouble)
for g in gtis:
if g[1] - g[0] < fftlen:
logging.info("GTI at %g--%g is Too short. Skipping." %
(g[0], g[1]))
continue
newtimes = np.arange(g[0], g[1] - fftlen, np.longdouble(fftlen),
dtype=np.longdouble)
spectrum_start_times = \
np.append(spectrum_start_times,
newtimes)
assert len(spectrum_start_times) > 0, \
"No GTIs are equal to or longer than fftlen. " + \
"Choose shorter fftlen (MPfspec -f <fftlen> <options> <filename>)"
return spectrum_start_times
[docs]def decide_spectrum_lc_intervals(gtis, fftlen, time):
"""Similar to decide_spectrum_intervals, but dedicated to light curves.
In this case, it is necessary to specify the time array containing the
times of the light curve bins.
Returns start and stop bins of the intervals to use for the PDS
Parameters
----------
gtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
fftlen : float
Length of the chunks
time : array-like
Times of light curve bins
"""
bintime = time[1] - time[0]
nbin = np.long(fftlen / bintime)
spectrum_start_bins = np.array([], dtype=np.long)
for g in gtis:
if g[1] - g[0] < fftlen:
logging.info("GTI at %g--%g is Too short. Skipping." %
(g[0], g[1]))
continue
startbin = np.argmin(np.abs(time - g[0]))
stopbin = np.argmin(np.abs(time - g[1]))
newbins = np.arange(startbin, stopbin - nbin, nbin,
dtype=np.long)
spectrum_start_bins = \
np.append(spectrum_start_bins,
newbins)
assert len(spectrum_start_bins) > 0, \
"No GTIs are equal to or longer than fftlen. " + \
"Choose shorter fftlen (MPfspec -f <fftlen> <options> <filename>)"
return spectrum_start_bins, spectrum_start_bins + nbin
[docs]def calc_pds(lcfile, fftlen,
save_dyn=False,
bintime=1,
pdsrebin=1,
normalization='Leahy',
back_ctrate=0.,
noclobber=False,
outname=None):
"""Calculate the PDS from an input light curve file.
Parameters
----------
lcfile : str
The light curve file
fftlen : float
The length of the chunks over which FFTs will be calculated, in seconds
Other Parameters
----------------
save_dyn : bool
If True, save the dynamical power spectrum
bintime : float
The bin time. If different from that of the light curve, a rebinning is
performed
pdsrebin : int
Rebin the PDS of this factor.
normalization : str
'Leahy' or 'rms'
back_ctrate : float
The non-source count rate
noclobber : bool
If True, do not overwrite existing files
outname : str
If speficied, output file name. If not specified or None, the new file
will have the same root as the input light curve and the '_pds' suffix
"""
root = mp_root(lcfile)
outname = root + '_pds' + MP_FILE_EXTENSION
if noclobber and os.path.exists(outname):
print('File exists, and noclobber option used. Skipping')
return
logging.info("Loading file %s..." % lcfile)
lcdata = load_lcurve(lcfile)
time = lcdata['time']
mjdref = lcdata['MJDref']
try:
lc = lcdata['lccorr']
except:
lc = lcdata['lc']
dt = lcdata['dt']
gti = lcdata['GTI']
instr = lcdata['Instr']
tctrate = lcdata['total_ctrate']
if bintime <= dt:
bintime = dt
else:
lcrebin = np.rint(bintime / dt)
bintime = lcrebin * dt
logging.info("Rebinning lc by a factor %d" % lcrebin)
time, lc, dum = \
const_rebin(time, lc, lcrebin, normalize=False)
results = welch_pds(time, lc, bintime, fftlen, gti, return_all=True)
freq = results.f
pds = results.pds
epds = results.epds
npds = results.npds
ctrate = results.ctrate
freq, pds, epds = const_rebin(freq[1:], pds[1:], pdsrebin,
epds[1:])
if normalization == 'rms':
logging.info('Applying %s normalization' % normalization)
pds, epds = \
rms_normalize_pds(pds, epds,
source_ctrate=ctrate,
back_ctrate=back_ctrate)
for ic, pd in enumerate(results.dynpds):
ep = results.edynpds[ic].copy()
ct = results.dynctrate[ic].copy()
pd, ep = rms_normalize_pds(pd, ep, source_ctrate=ct,
back_ctrate=back_ctrate)
results.edynpds[ic][:] = ep
results.dynpds[ic][:] = pd
outdata = {'time': time[0], 'pds': pds, 'epds': epds, 'npds': npds,
'fftlen': fftlen, 'Instr': instr, 'freq': freq,
'rebin': pdsrebin, 'norm': normalization, 'ctrate': ctrate,
'total_ctrate': tctrate,
'back_ctrate': back_ctrate, 'MJDref': mjdref}
if 'Emin' in lcdata.keys():
outdata['Emin'] = lcdata['Emin']
outdata['Emax'] = lcdata['Emax']
if 'PImin' in lcdata.keys():
outdata['PImin'] = lcdata['PImin']
outdata['PImax'] = lcdata['PImax']
logging.debug(repr(results.dynpds))
if save_dyn:
outdata["dynpds"] = np.array(results.dynpds)[:, 1:]
outdata["edynpds"] = np.array(results.edynpds)[:, 1:]
outdata["dynctrate"] = np.array(results.dynctrate)
outdata["dyntimes"] = np.array(results.times)
logging.info('Saving PDS to %s' % outname)
save_pds(outdata, outname)
[docs]def calc_cpds(lcfile1, lcfile2, fftlen,
save_dyn=False,
bintime=1,
pdsrebin=1,
outname='cpds' + MP_FILE_EXTENSION,
normalization='Leahy',
back_ctrate=0.,
noclobber=False):
"""Calculate the CPDS from a pair of input light curve files.
Parameters
----------
lcfile1 : str
The first light curve file
lcfile2 : str
The second light curve file
fftlen : float
The length of the chunks over which FFTs will be calculated, in seconds
Other Parameters
----------------
save_dyn : bool
If True, save the dynamical power spectrum
bintime : float
The bin time. If different from that of the light curve, a rebinning is
performed
pdsrebin : int
Rebin the PDS of this factor.
normalization : str
'Leahy' or 'rms'. Default 'Leahy'
back_ctrate : float
The non-source count rate
noclobber : bool
If True, do not overwrite existing files
outname : str
Output file name for the cpds. Default: cpds.[nc|p]
"""
if noclobber and os.path.exists(outname):
print('File exists, and noclobber option used. Skipping')
return
logging.info("Loading file %s..." % lcfile1)
lcdata1 = load_lcurve(lcfile1)
logging.info("Loading file %s..." % lcfile2)
lcdata2 = load_lcurve(lcfile2)
time1 = lcdata1['time']
try:
lc1 = lcdata1['lccorr']
except:
lc1 = lcdata1['lc']
dt1 = lcdata1['dt']
gti1 = lcdata1['GTI']
instr1 = lcdata1['Instr']
tctrate1 = lcdata1['total_ctrate']
mjdref = lcdata1['MJDref']
time2 = lcdata2['time']
try:
lc2 = lcdata2['lccorr']
except:
lc2 = lcdata2['lc']
dt2 = lcdata2['dt']
gti2 = lcdata2['GTI']
instr2 = lcdata2['Instr']
tctrate2 = lcdata2['total_ctrate']
tctrate = np.sqrt(tctrate1 * tctrate2)
assert instr1 != instr2, ('Did you check the ordering of files? '
'These are both ' + instr1)
assert dt1 == dt2, 'Light curves are sampled differently'
dt = dt1
if bintime <= dt:
bintime = dt
else:
lcrebin = np.rint(bintime / dt)
dt = bintime
logging.info("Rebinning lcs by a factor %d" % lcrebin)
time1, lc1, dum = \
const_rebin(time1, lc1, lcrebin, normalize=False)
time2, lc2, dum = \
const_rebin(time2, lc2, lcrebin, normalize=False)
gti = cross_gtis([gti1, gti2])
mask1 = create_gti_mask(time1, gti)
mask2 = create_gti_mask(time2, gti)
time1 = time1[mask1]
time2 = time2[mask2]
assert np.all(time1 == time2), "Something's not right in GTI filtering"
time = time1
del time2
lc1 = lc1[mask1]
lc2 = lc2[mask2]
results = welch_cpds(time, lc1, lc2, bintime, fftlen, gti,
return_all=True)
freq = results.f
cpds = results.cpds
ecpds = results.ecpds
ncpds = results.ncpds
ctrate = results.ctrate
freq, cpds, ecpds = const_rebin(freq[1:], cpds[1:], pdsrebin,
ecpds[1:])
if normalization == 'rms':
logging.info('Applying %s normalization' % normalization)
cpds, ecpds = \
rms_normalize_pds(cpds, ecpds,
source_ctrate=ctrate,
back_ctrate=back_ctrate)
for ic, cp in enumerate(results.dyncpds):
ec = results.edyncpds[ic].copy()
ct = results.dynctrate[ic].copy()
cp, ec = rms_normalize_pds(cp, ec, source_ctrate=ct,
back_ctrate=back_ctrate)
results.edyncpds[ic][:] = ec
results.dyncpds[ic][:] = cp
outdata = {'time': gti[0][0], 'cpds': cpds, 'ecpds': ecpds, 'ncpds': ncpds,
'fftlen': fftlen, 'Instrs': instr1 + ',' + instr2,
'freq': freq, 'rebin': pdsrebin, 'norm': normalization,
'ctrate': ctrate, 'total_ctrate': tctrate,
'back_ctrate': back_ctrate, 'MJDref': mjdref}
if 'Emin' in lcdata1.keys():
outdata['Emin1'] = lcdata1['Emin']
outdata['Emax1'] = lcdata1['Emax']
if 'Emin' in lcdata2.keys():
outdata['Emin2'] = lcdata2['Emin']
outdata['Emax2'] = lcdata2['Emax']
if 'PImin' in lcdata1.keys():
outdata['PImin1'] = lcdata1['PImin']
outdata['PImax1'] = lcdata1['PImax']
if 'PImin' in lcdata2.keys():
outdata['PImin2'] = lcdata2['PImin']
outdata['PImax2'] = lcdata2['PImax']
logging.debug(repr(results.dyncpds))
if save_dyn:
outdata["dyncpds"] = np.array(results.dyncpds)[:, 1:]
outdata["edyncpds"] = np.array(results.edyncpds)[:, 1:]
outdata["dynctrate"] = np.array(results.dynctrate)
outdata["dyntimes"] = np.array(results.times)
logging.info('Saving CPDS to %s' % outname)
save_pds(outdata, outname)
[docs]def calc_fspec(files, fftlen,
do_calc_pds=True,
do_calc_cpds=True,
do_calc_cospectrum=True,
do_calc_lags=True,
save_dyn=False,
bintime=1,
pdsrebin=1,
outroot=None,
normalization='Leahy',
nproc=1,
back_ctrate=0.,
noclobber=False):
r"""Calculate the frequency spectra: the PDS, the cospectrum, ...
Parameters
----------
files : list of str
List of input file names
fftlen : float
length of chunks to perform the FFT on.
Other Parameters
----------------
save_dyn : bool
If True, save the dynamical power spectrum
bintime : float
The bin time. If different from that of the light curve, a rebinning is
performed
pdsrebin : int
Rebin the PDS of this factor.
normalization : str
'Leahy' [3] or 'rms' [4] [5]. Default 'Leahy'.
back_ctrate : float
The non-source count rate
noclobber : bool
If True, do not overwrite existing files
outroot : str
Output file name root
nproc : int
Number of processors to use to parallelize the processing of multiple
files
References
----------
[3] Leahy et al. 1983, ApJ, 266, 160.
[4] Belloni & Hasinger 1990, A&A, 230, 103
[5] Miyamoto et al. 1991, ApJ, 383, 784
"""
if normalization not in ['Leahy', 'rms']:
logging.warning('Beware! Unknown normalization!')
normalization = 'Leahy'
logging.info('Using %s normalization' % normalization)
if do_calc_pds:
wrapped_file_dicts = []
for f in files:
wfd = {"fftlen": fftlen,
"save_dyn": save_dyn,
"bintime": bintime,
"pdsrebin": pdsrebin,
"normalization": normalization,
"back_ctrate": back_ctrate,
"noclobber": noclobber}
wfd["fname"] = f
wrapped_file_dicts.append(wfd)
if os.name == 'nt' or nproc == 1:
[_wrap_fun_pds(w) for w in wrapped_file_dicts]
else:
pool = Pool(processes=nproc)
for i in pool.imap_unordered(_wrap_fun_pds, wrapped_file_dicts):
pass
pool.close()
if not do_calc_cpds or len(files) < 2:
return
logging.info('Sorting file list')
sorted_files = sort_files(files)
logging.warning('Beware! For cpds and derivatives, I assume that the'
'files are from only two instruments and in pairs'
'(even in random order)')
instrs = list(sorted_files.keys())
files1 = sorted_files[instrs[0]]
files2 = sorted_files[instrs[1]]
assert len(files1) == len(files2), 'An even number of files is needed'
argdict = {"fftlen": fftlen,
"save_dyn": save_dyn,
"bintime": bintime,
"pdsrebin": pdsrebin,
"normalization": normalization,
"back_ctrate": back_ctrate,
"noclobber": noclobber}
funcargs = []
for i_f, f in enumerate(files1):
f1, f2 = f, files2[i_f]
outdir = os.path.dirname(f1)
if outdir == '':
outdir = os.getcwd()
outr = _assign_value_if_none(
outroot,
common_name(f1, f2, default='%d' % i_f))
outname = os.path.join(outdir,
outr.replace(MP_FILE_EXTENSION, '') +
'_cpds' + MP_FILE_EXTENSION)
funcargs.append([f1, f2, outname, argdict])
if os.name == 'nt' or nproc == 1:
[_wrap_fun_cpds(fa) for fa in funcargs]
else:
pool = Pool(processes=nproc)
for i in pool.imap_unordered(_wrap_fun_cpds, funcargs):
pass
pool.close()
[docs]def read_fspec(fname):
"""Read the frequency spectrum from a file.
Parameters
----------
fname : str
The input file name
Returns
-------
ftype : str
File type
freq : array-like
Frequency array
fspec : array-like
Frequency spectrum array
efspec : array-like
Errors on spectral bins
nchunks : int
Number of spectra that have been summed to obtain fspec
rebin : array-like or int
Rebin factor in each bin. Might be irregular in case of geometrical
binning
"""
ftype, contents = get_file_type(fname)
if 'freq' in list(contents.keys()):
freq = contents['freq']
elif 'flo' in list(contents.keys()):
flo = contents['flo']
fhi = contents['fhi']
freq = [flo, fhi]
ft = ftype.replace('reb', '')
pds = contents[ft]
epds = contents['e' + ft]
nchunks = contents['n' + ft]
rebin = contents['rebin']
return ftype, freq, pds, epds, nchunks, rebin, contents
def _normalize(array, ref=0):
m = ref
std = np.std(array)
newarr = np.zeros_like(array)
good = array > m
newarr[good] = (array[good] - ref) / std
return newarr
[docs]def dumpdyn(fname, plot=False):
"""Dump a dynamical frequency spectrum in text files.
Parameters
----------
fname : str
The file name
Other Parameters
----------------
plot : bool
if True, plot the spectrum
"""
ftype, pdsdata = get_file_type(fname, specify_reb=False)
dynpds = pdsdata['dyn' + ftype]
edynpds = pdsdata['edyn' + ftype]
try:
freq = pdsdata['freq']
except:
flo = pdsdata['flo']
fhi = pdsdata['fhi']
freq = (fhi + flo) / 2
time = pdsdata['dyntimes']
freqs = np.zeros_like(dynpds)
times = np.zeros_like(dynpds)
for i, im in enumerate(dynpds):
freqs[i, :] = freq
times[i, :] = time[i]
t = times.real.flatten()
f = freqs.real.flatten()
d = dynpds.real.flatten()
e = edynpds.real.flatten()
np.savetxt('{0}_dumped_{1}.txt'.format(mp_root(fname), ftype),
np.array([t, f, d, e]).T)
size = _normalize(d)
if plot:
import matplotlib.pyplot as plt
plt.scatter(t, f, s=size)
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.show()
[docs]def dumpdyn_main(args=None):
"""Main function called by the `MPdumpdyn` command line script."""
import argparse
description = ('Dump dynamical (cross) power spectra')
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help=("List of files in any valid MaLTPyNT "
"format for PDS or CPDS"), nargs='+')
parser.add_argument("--noplot", help="plot results",
default=False, action='store_true')
args = parser.parse_args(args)
fnames = args.files
for f in fnames:
dumpdyn(f, plot=not args.noplot)
[docs]def main(args=None):
"""Main function called by the `MPfspec` command line script."""
import argparse
description = ('Create frequency spectra (PDS, CPDS, cospectrum) '
'starting from well-defined input ligthcurves')
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of light curve files", nargs='+')
parser.add_argument("-b", "--bintime", type=float, default=1/4096,
help="Light curve bin time; if negative, interpreted" +
" as negative power of 2." +
" Default: 2^-10, or keep input lc bin time" +
" (whatever is larger)")
parser.add_argument("-r", "--rebin", type=int, default=1,
help="(C)PDS rebinning to apply. Default: none")
parser.add_argument("-f", "--fftlen", type=float, default=512,
help="Length of FFTs. Default: 512 s")
parser.add_argument("-k", "--kind", type=str, default="PDS,CPDS,cos",
help='Spectra to calculate, as comma-separated list' +
' (Accepted: PDS and CPDS;' +
' Default: "PDS,CPDS")')
parser.add_argument("--norm", type=str, default="Leahy",
help='Normalization to use' +
' (Accepted: Leahy and rms;' +
' Default: "Leahy")')
parser.add_argument("--noclobber", help="Do not overwrite existing files",
default=False, action='store_true')
parser.add_argument("-o", "--outroot", type=str, default=None,
help='Root of output file names for CPDS only')
parser.add_argument("--loglevel",
help=("use given logging level (one between INFO, "
"WARNING, ERROR, CRITICAL, DEBUG; "
"default:WARNING)"),
default='WARNING',
type=str)
parser.add_argument("--nproc",
help=("Number of processors to use"),
default=1,
type=int)
parser.add_argument("--back",
help=("Estimated background (non-source) count rate"),
default=0.,
type=float)
parser.add_argument("--debug", help="use DEBUG logging level",
default=False, action='store_true')
parser.add_argument("--save-dyn", help="save dynamical power spectrum",
default=False, action='store_true')
args = parser.parse_args(args)
if args.debug:
args.loglevel = 'DEBUG'
numeric_level = getattr(logging, args.loglevel.upper(), None)
logging.basicConfig(filename='MPfspec.log', level=numeric_level,
filemode='w')
bintime = args.bintime
fftlen = args.fftlen
pdsrebin = args.rebin
normalization = args.norm
do_cpds = do_pds = do_cos = do_lag = False
kinds = args.kind.split(',')
for k in kinds:
if k == 'PDS':
do_pds = True
elif k == 'CPDS':
do_cpds = True
elif k == 'cos' or k == 'cospectrum':
do_cos = True
do_cpds = True
elif k == 'lag':
do_lag = True
do_cpds = True
calc_fspec(args.files, fftlen,
do_calc_pds=do_pds,
do_calc_cpds=do_cpds,
do_calc_cospectrum=do_cos,
do_calc_lags=do_lag,
save_dyn=args.save_dyn,
bintime=bintime,
pdsrebin=pdsrebin,
outroot=args.outroot,
normalization=normalization,
nproc=args.nproc,
back_ctrate=args.back,
noclobber=args.noclobber)