# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Light curve-related functions."""
from __future__ import (absolute_import, unicode_literals, division,
print_function)
import numpy as np
from .base import mp_root, create_gti_mask, cross_gtis, mkdir_p
from .base import contiguous_regions, calc_countrate, gti_len
from .base import _assign_value_if_none, _look_for_array_in_array
from .io import load_events, load_lcurve, save_lcurve
from .io import MP_FILE_EXTENSION, high_precision_keyword_read
import os
import logging
import warnings
[docs]def lcurve(event_list,
bin_time,
start_time=None,
stop_time=None,
centertime=True):
"""From a list of event times, estract a lightcurve.
Parameters
----------
event_list : array-like
Times of arrival of events
bin_time : float
Binning time of the light curve
Returns
-------
time : array-like
The time bins of the light curve
lc : array-like
The light curve
Other Parameters
----------------
start_time : float
Initial time of the light curve
stop_time : float
Stop time of the light curve
centertime: bool
If False, time is the start of the bin. Otherwise, the center
"""
start_time = _assign_value_if_none(start_time, np.floor(event_list[0]))
stop_time = _assign_value_if_none(stop_time, np.floor(event_list[-1]))
logging.debug("lcurve: Time limits: %g -- %g" %
(start_time, stop_time))
new_event_list = event_list[event_list >= start_time]
new_event_list = new_event_list[new_event_list <= stop_time]
# To compute the histogram, the times array must specify the bin edges.
# therefore, if nbin is the length of the lightcurve, times will have
# nbin + 1 elements
new_event_list = ((new_event_list - start_time) / bin_time).astype(int)
times = np.arange(start_time, stop_time, bin_time)
lc = np.bincount(new_event_list, minlength=len(times))
logging.debug("lcurve: Length of the lightcurve: %g" % len(times))
logging.debug("Times, kind: %s, %s" % (repr(times), type(times[0])))
logging.debug("Lc, kind: %s, %s" % (repr(lc), type(lc[0])))
logging.debug("bin_time, kind: %s, %s" % (repr(bin_time), type(bin_time)))
if centertime:
times = times + bin_time / 2.
return times, lc.astype(np.float)
[docs]def join_lightcurves(lcfilelist, outfile='out_lc' + MP_FILE_EXTENSION):
"""Join light curves from different files.
Light curves from different instruments are put in different channels.
Parameters
----------
lcfilelist :
outfile :
See Also
--------
scrunch_lightcurves : Create a single light curve from input light
curves.
"""
lcdatas = []
for lfc in lcfilelist:
logging.info("Loading file %s..." % lfc)
lcdata = load_lcurve(lfc)
logging.info("Done.")
lcdatas.append(lcdata)
del lcdata
# --------------- Check consistency of data --------------
lcdts = [lcdata['dt'] for lcdata in lcdatas]
# Find unique elements. If multiple bin times are used, throw an exception
lcdts = list(set(lcdts))
assert len(lcdts) == 1, 'Light curves must have same dt for scrunching'
instrs = [lcdata['Instr'] for lcdata in lcdatas]
# Find unique elements. A lightcurve will be produced for each instrument
instrs = list(set(instrs))
outlcs = {}
times = {}
lcs = {}
gtis = {}
for instr in instrs:
outlcs[instr] = {'dt': lcdts[0], 'Tstart': 1e32, 'Tstop': -11,
'MJDref': lcdatas[0]['MJDref'], 'source_ctrate': 0,
'total_ctrate': 0}
times[instr] = []
lcs[instr] = []
gtis[instr] = []
# -------------------------------------------------------
for lcdata in lcdatas:
instr = lcdata['Instr']
tstart = lcdata['Tstart']
tstop = lcdata['Tstop']
if tstart < outlcs[instr]['Tstart']:
outlcs[instr]['Tstart'] = tstart
if tstop > outlcs[instr]['Tstop']:
outlcs[instr]['Tstop'] = tstop
time = lcdata['time']
lc = lcdata['lc']
gti = lcdata['GTI']
times[instr].extend(time)
lcs[instr].extend(lc)
gtis[instr].extend(gti)
goodlen = gti_len(gti)
outlcs[instr]['source_ctrate'] += lcdata['source_ctrate'] * goodlen
outlcs[instr]['total_ctrate'] += lcdata['total_ctrate'] * goodlen
for instr in instrs:
gti = np.array(gtis[instr])
outlcs[instr]['time'] = np.array(times[instr])
outlcs[instr]['lc'] = np.array(lcs[instr])
outlcs[instr]['GTI'] = gti
outlcs[instr]['Instr'] = instr
goodlen = gti_len(gti)
outlcs[instr]['source_ctrate'] /= goodlen
outlcs[instr]['total_ctrate'] /= goodlen
if outfile is not None:
for instr in instrs:
if len(instrs) == 1:
tag = ""
else:
tag = instr
logging.info('Saving joined light curve to %s' % outfile)
dname, fname = os.path.split(outfile)
save_lcurve(outlcs[instr], os.path.join(dname, tag + fname))
return outlcs
[docs]def scrunch_lightcurves(lcfilelist, outfile='out_scrlc'+MP_FILE_EXTENSION,
save_joint=False):
"""Create a single light curve from input light curves.
Light curves are appended when they cover different times, and summed when
they fall in the same time range. This is done regardless of the channel
or the instrument.
Parameters
----------
lcfilelist : list of str
The list of light curve files to scrunch
Returns
-------
time : array-like
The time array
lc :
The new light curve
gti : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
Good Time Intervals
Other Parameters
----------------
outfile : str
The output file name
save_joint : bool
If True, save the per-channel joint light curves
See Also
--------
join_lightcurves : Join light curves from different files
"""
if save_joint:
lcdata = join_lightcurves(lcfilelist)
else:
lcdata = join_lightcurves(lcfilelist, outfile=None)
instrs = list(lcdata.keys())
gti_lists = [lcdata[inst]['GTI'] for inst in instrs]
gti = cross_gtis(gti_lists)
# Determine limits
time0 = lcdata[instrs[0]]['time']
mask = create_gti_mask(time0, gti)
time0 = time0[mask]
lc0 = lcdata[instrs[0]]['lc']
lc0 = lc0[mask]
for inst in instrs[1:]:
time1 = lcdata[inst]['time']
mask = create_gti_mask(time1, gti)
time1 = time1[mask]
assert np.all(time0 == time1), \
'Something is not right with gti filtering'
lc = lcdata[inst]['lc']
lc0 += lc[mask]
out = lcdata[instrs[0]].copy()
out['lc'] = lc0
out['time'] = time0
out['dt'] = lcdata[instrs[0]]['dt']
out['GTI'] = gti
out['Instr'] = ",".join(instrs)
out['source_ctrate'] = np.sum([lcdata[i]['source_ctrate'] for i in instrs])
out['total_ctrate'] = np.sum([lcdata[i]['total_ctrate'] for i in instrs])
logging.info('Saving scrunched light curve to %s' % outfile)
save_lcurve(out, outfile)
return time0, lc0, gti
[docs]def filter_lc_gtis(time, lc, gti, safe_interval=None, delete=False,
min_length=0, return_borders=False):
"""Filter a light curve for GTIs.
Parameters
----------
time : array-like
The time bins of the light curve
lc : array-like
The light curve
gti : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
Good Time Intervals
Returns
-------
time : array-like
The time bins of the light curve
lc : array-like
The output light curve
newgtis : [[gti0_0, gti0_1], [gti1_0, gti1_1], ...]
The output Good Time Intervals
borders : [[i0_0, i0_1], [i1_0, i1_1], ...], optional
The indexes of the light curve corresponding to the borders of the
GTIs. Returned if return_borders is set to True
Other Parameters
----------------
safe_interval : float or [float, float]
Seconds to filter out at the start and end of each GTI. If single
float, these safe windows are equal, otherwise the two numbers refer
to the start and end of the GTI respectively
delete : bool
If delete is True, the intervals outside of GTIs are filtered out from
the light curve. Otherwise, they are set to zero.
min_length : float
Minimum length of GTI. GTIs below this length will be removed.
return_borders : bool
If True, return also the indexes of the light curve corresponding to
the borders of the GTIs
"""
mask, newgtis = create_gti_mask(time, gti, return_new_gtis=True,
safe_interval=safe_interval,
min_length=min_length)
nomask = np.logical_not(mask)
if delete:
time = time[mask]
lc = lc[mask]
else:
lc[nomask] = 0
if return_borders:
mask = create_gti_mask(time, newgtis)
borders = contiguous_regions(mask)
return time, lc, newgtis, borders
else:
return time, lc, newgtis
[docs]def lcurve_from_events(f, safe_interval=0,
pi_interval=None,
e_interval=None,
min_length=0,
gti_split=False,
ignore_gtis=False,
bintime=1.,
outdir=None,
outfile=None,
noclobber=False):
"""Bin an event list in a light curve.
Parameters
----------
f : str
Input event file name
bintime : float
The bin time of the output light curve
Returns
-------
outfiles : list
List of output light curves
Other Parameters
----------------
safe_interval : float or [float, float]
Seconds to filter out at the start and end of each GTI. If single
float, these safe windows are equal, otherwise the two numbers refer
to the start and end of the GTI respectively
pi_interval : [int, int]
PI channel interval to select. Default None, meaning that all PI
channels are used
e_interval : [float, float]
Energy interval to select (only works if event list is calibrated with
`calibrate`). Default None
min_length : float
GTIs below this length will be filtered out
gti_split : bool
If True, create one light curve for each good time interval
ignore_gtis : bool
Ignore good time intervals, and get a single light curve that includes
possible gaps
outdir : str
Output directory
outfile : str
Output file
noclobber : bool
If True, do not overwrite existing files
"""
logging.info("Loading file %s..." % f)
evdata = load_events(f)
logging.info("Done.")
if bintime < 0:
bintime = 2 ** (bintime)
bintime = np.longdouble(bintime)
tag = ''
out = {}
tstart = evdata['Tstart']
tstop = evdata['Tstop']
events = evdata['time']
instr = evdata['Instr']
mjdref = evdata['MJDref']
if instr == 'PCA':
pcus = evdata['PCU']
gtis = evdata['GTI']
if ignore_gtis:
gtis = np.array([[tstart, tstop]])
out['MJDref'] = mjdref
# make tstart and tstop multiples of bin times since MJDref
tstart = np.ceil(tstart / bintime, dtype=np.longdouble) * bintime
tstop = np.floor(tstop / bintime, dtype=np.longdouble) * bintime
# First of all, calculate total count rate (no filtering applied)
tot_time, tot_lc = lcurve(events, bintime, start_time=tstart,
stop_time=tstop)
tot_time, tot_lc, newgtis, tot_borders = \
filter_lc_gtis(tot_time, tot_lc, gtis,
safe_interval=safe_interval,
delete=False,
min_length=min_length,
return_borders=True)
out['total_ctrate'] = calc_countrate(tot_time, tot_lc, bintime=bintime)
# Then, apply filters
if pi_interval is not None and np.all(np.array(pi_interval) > 0):
pis = evdata['PI']
good = np.logical_and(pis > pi_interval[0],
pis <= pi_interval[1])
events = events[good]
tag = '_PI%g-%g' % (pi_interval[0], pi_interval[1])
out['PImin'] = e_interval[0]
out['PImax'] = e_interval[0]
elif e_interval is not None and np.all(np.array(e_interval) > 0):
try:
es = evdata['E']
except:
raise \
ValueError("No energy information is present in the file." +
" Did you run MPcalibrate?")
good = np.logical_and(es > e_interval[0],
es <= e_interval[1])
events = events[good]
tag = '_E%g-%g' % (e_interval[0], e_interval[1])
out['Emin'] = e_interval[0]
out['Emax'] = e_interval[1]
# Assign default value if None
outfile = _assign_value_if_none(outfile, mp_root(f) + tag + '_lc')
# Take out extension from name, if present, then give extension. This
# avoids multiple extensions
outfile = outfile.replace(MP_FILE_EXTENSION, '') + MP_FILE_EXTENSION
outdir = _assign_value_if_none(
outdir, os.path.dirname(os.path.abspath(f)))
_, outfile = os.path.split(outfile)
mkdir_p(outdir)
outfile = os.path.join(outdir, outfile)
if noclobber and os.path.exists(outfile):
warnings.warn('File exists, and noclobber option used. Skipping')
return [outfile]
time, lc = lcurve(events, bintime, start_time=tstart,
stop_time=tstop)
time, lc, newgtis, borders = \
filter_lc_gtis(time, lc, gtis,
safe_interval=safe_interval,
delete=False,
min_length=min_length,
return_borders=True)
if len(newgtis) == 0:
warnings.warn(
"No GTIs above min_length ({0}s) found.".format(min_length))
return
assert np.all(borders == tot_borders), \
'Borders do not coincide: {0} {1}'.format(borders, tot_borders)
out['source_ctrate'] = calc_countrate(time, lc, gtis=newgtis,
bintime=bintime)
if gti_split:
outfiles = []
logging.debug(borders)
for ib, b in enumerate(borders):
local_tag = tag + '_gti%d' % ib
outf = mp_root(outfile) + local_tag + '_lc' + MP_FILE_EXTENSION
if noclobber and os.path.exists(outf):
warnings.warn(
'File exists, and noclobber option used. Skipping')
outfiles.append(outf)
logging.debug(b)
local_out = out.copy()
local_out['lc'] = lc[b[0]:b[1]]
local_out['time'] = time[b[0]:b[1]]
local_out['dt'] = bintime
local_gti = np.array([[time[b[0]], time[b[1]-1]]])
local_out['GTI'] = local_gti
local_out['Tstart'] = time[b[0]]
local_out['Tstop'] = time[b[1]-1]
local_out['Instr'] = instr
local_out['source_ctrate'] = calc_countrate(time[b[0]:b[1]],
lc[b[0]:b[1]],
bintime=bintime)
local_out['total_ctrate'] = calc_countrate(tot_time[b[0]:b[1]],
tot_lc[b[0]:b[1]],
bintime=bintime)
if instr == 'PCA':
local_out['nPCUs'] = len(set(pcus))
logging.info('Saving light curve to %s' % outf)
save_lcurve(local_out, outf)
outfiles.append(outf)
else:
out['lc'] = lc
out['time'] = time
out['dt'] = bintime
out['GTI'] = newgtis
out['Tstart'] = tstart
out['Tstop'] = tstop
out['Instr'] = instr
if instr == 'PCA':
out['nPCUs'] = len(set(pcus))
logging.info('Saving light curve to %s' % outfile)
save_lcurve(out, outfile)
outfiles = [outfile]
# For consistency in return value
return outfiles
[docs]def lcurve_from_fits(fits_file, gtistring='GTI',
timecolumn='TIME', ratecolumn=None, ratehdu=1,
fracexp_limit=0.9, outfile=None,
noclobber=False, outdir=None):
"""
Load a lightcurve from a fits file and save it in MaLTPyNT format.
.. note ::
FITS light curve handling is still under testing.
Absolute times might be incorrect depending on the light curve format.
Parameters
----------
fits_file : str
File name of the input light curve in FITS format
Returns
-------
outfile : [str]
Returned as a list with a single element for consistency with
`lcurve_from_events`
Other Parameters
----------------
gtistring : str
Name of the GTI extension in the FITS file
timecolumn : str
Name of the column containing times in the FITS file
ratecolumn : str
Name of the column containing rates in the FITS file
ratehdu : str or int
Name or index of the FITS extension containing the light curve
fracexp_limit : float
Minimum exposure fraction allowed
outfile : str
Output file name
noclobber : bool
If True, do not overwrite existing files
"""
logging.warning(
"""WARNING! FITS light curve handling is still under testing.
Absolute times might be incorrect.""")
# TODO:
# treat consistently TDB, UTC, TAI, etc. This requires some documentation
# reading. For now, we assume TDB
from astropy.io import fits as pf
from astropy.time import Time
import numpy as np
from .base import create_gti_from_condition
outfile = _assign_value_if_none(outfile, mp_root(fits_file) + '_lc')
outfile = outfile.replace(MP_FILE_EXTENSION, '') + MP_FILE_EXTENSION
outdir = _assign_value_if_none(
outdir, os.path.dirname(os.path.abspath(fits_file)))
_, outfile = os.path.split(outfile)
mkdir_p(outdir)
outfile = os.path.join(outdir, outfile)
if noclobber and os.path.exists(outfile):
warnings.warn('File exists, and noclobber option used. Skipping')
return [outfile]
lchdulist = pf.open(fits_file)
lctable = lchdulist[ratehdu].data
# Units of header keywords
tunit = lchdulist[ratehdu].header['TIMEUNIT']
try:
mjdref = high_precision_keyword_read(lchdulist[ratehdu].header,
'MJDREF')
mjdref = Time(mjdref, scale='tdb', format='mjd')
except:
mjdref = None
try:
instr = lchdulist[ratehdu].header['INSTRUME']
except:
instr = 'EXTERN'
# ----------------------------------------------------------------
# Trying to comply with all different formats of fits light curves.
# It's a madness...
try:
tstart = high_precision_keyword_read(lchdulist[ratehdu].header,
'TSTART')
tstop = high_precision_keyword_read(lchdulist[ratehdu].header,
'TSTOP')
except:
raise(Exception('TSTART and TSTOP need to be specified'))
# For nulccorr lcs this whould work
try:
timezero = high_precision_keyword_read(lchdulist[ratehdu].header,
'TIMEZERO')
# Sometimes timezero is "from tstart", sometimes it's an absolute time.
# This tries to detect which case is this, and always consider it
# referred to tstart
except:
timezero = 0
# for lcurve light curves this should instead work
if tunit == 'd':
# TODO:
# Check this. For now, I assume TD (JD - 2440000.5).
# This is likely wrong
timezero = Time(2440000.5 + timezero, scale='tdb', format='jd')
tstart = Time(2440000.5 + tstart, scale='tdb', format='jd')
tstop = Time(2440000.5 + tstop, scale='tdb', format='jd')
# if None, use NuSTAR defaulf MJDREF
mjdref = _assign_value_if_none(
mjdref, Time(np.longdouble('55197.00076601852'), scale='tdb',
format='mjd'))
timezero = (timezero - mjdref).to('s').value
tstart = (tstart - mjdref).to('s').value
tstop = (tstop - mjdref).to('s').value
if timezero > tstart:
timezero -= tstart
time = np.array(lctable.field(timecolumn), dtype=np.longdouble)
if time[-1] < tstart:
time += timezero + tstart
else:
time += timezero
try:
dt = high_precision_keyword_read(lchdulist[ratehdu].header,
'TIMEDEL')
if tunit == 'd':
dt *= 86400
except:
warnings.warn('Assuming that TIMEDEL is the difference between the'
' first two times of the light curve')
dt = time[1] - time[0]
# ----------------------------------------------------------------
ratecolumn = _assign_value_if_none(
ratecolumn,
_look_for_array_in_array(['RATE', 'RATE1', 'COUNTS'], lctable.names))
rate = np.array(lctable.field(ratecolumn), dtype=np.float)
try:
rate_e = np.array(lctable.field('ERROR'), dtype=np.longdouble)
except:
rate_e = np.zeros_like(rate)
if 'RATE' in ratecolumn:
rate *= dt
rate_e *= dt
try:
fracexp = np.array(lctable.field('FRACEXP'), dtype=np.longdouble)
except:
fracexp = np.ones_like(rate)
good_intervals = np.logical_and(fracexp >= fracexp_limit, fracexp <= 1)
good_intervals = (rate == rate) * (fracexp >= fracexp_limit) * \
(fracexp <= 1)
rate[good_intervals] /= fracexp[good_intervals]
rate_e[good_intervals] /= fracexp[good_intervals]
rate[np.logical_not(good_intervals)] = 0
try:
gtitable = lchdulist[gtistring].data
gti_list = np.array([[a, b]
for a, b in zip(gtitable.field('START'),
gtitable.field('STOP'))],
dtype=np.longdouble)
except:
gti_list = create_gti_from_condition(time, good_intervals)
lchdulist.close()
out = {}
out['lc'] = rate
out['elc'] = rate_e
out['time'] = time
out['dt'] = dt
out['GTI'] = gti_list
out['Tstart'] = tstart
out['Tstop'] = tstop
out['Instr'] = instr
out['MJDref'] = mjdref.value
out['total_ctrate'] = calc_countrate(time, rate, gtis=gti_list,
bintime=dt)
out['source_ctrate'] = calc_countrate(time, rate, gtis=gti_list,
bintime=dt)
logging.info('Saving light curve to %s' % outfile)
save_lcurve(out, outfile)
return [outfile]
[docs]def lcurve_from_txt(txt_file, outfile=None,
noclobber=False, outdir=None):
"""
Load a lightcurve from a text file.
Parameters
----------
txt_file : str
File name of the input light curve in text format. Assumes two columns:
time, counts. Times are seconds from MJDREF 55197.00076601852 (NuSTAR).
Returns
-------
outfile : [str]
Returned as a list with a single element for consistency with
`lcurve_from_events`
Other Parameters
----------------
outfile : str
Output file name
noclobber : bool
If True, do not overwrite existing files
"""
import numpy as np
outfile = _assign_value_if_none(outfile, mp_root(txt_file) + '_lc')
outfile = outfile.replace(MP_FILE_EXTENSION, '') + MP_FILE_EXTENSION
outdir = _assign_value_if_none(
outdir, os.path.dirname(os.path.abspath(txt_file)))
_, outfile = os.path.split(outfile)
mkdir_p(outdir)
outfile = os.path.join(outdir, outfile)
if noclobber and os.path.exists(outfile):
warnings.warn('File exists, and noclobber option used. Skipping')
return [outfile]
time, lc = np.genfromtxt(txt_file, delimiter=' ', unpack=True)
time = np.array(time, dtype=np.longdouble)
lc = np.array(lc, dtype=np.float)
dt = time[1] - time[0]
out = {}
out['lc'] = lc
out['time'] = time
out['dt'] = dt
gtis = np.array([[time[0] - dt / 2, time[-1] + dt / 2]])
out['GTI'] = gtis
out['Tstart'] = time[0] - dt / 2
out['Tstop'] = time[-1] + dt / 2
out['Instr'] = 'EXTERN'
out['MJDref'] = np.longdouble('55197.00076601852')
out['total_ctrate'] = calc_countrate(time, lc, gtis=gtis,
bintime=dt)
out['source_ctrate'] = calc_countrate(time, lc, gtis=gtis,
bintime=dt)
logging.info('Saving light curve to %s' % outfile)
save_lcurve(out, outfile)
return [outfile]
def _wrap_lc(args):
f, kwargs = args
try:
return lcurve_from_events(f, **kwargs)
except Exception as e:
warnings.warn("MPlcurve exception: {0}".format(str(e)))
return []
def _wrap_txt(args):
f, kwargs = args
try:
return lcurve_from_txt(f, **kwargs)
except Exception as e:
warnings.warn("MPlcurve exception: {0}".format(str(e)))
return []
def _wrap_fits(args):
f, kwargs = args
try:
return lcurve_from_fits(f, **kwargs)
except Exception as e:
warnings.warn("MPlcurve exception: {0}".format(str(e)))
[docs]def main(args=None):
"""Main function called by the `MPlcurve` command line script."""
import argparse
from multiprocessing import Pool
description = ('Create lightcurves starting from event files. It is '
'possible to specify energy or channel filtering options')
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of files", nargs='+')
parser.add_argument("-b", "--bintime", type=float, default=1,
help="Bin time; if negative, negative power of 2")
parser.add_argument("--safe-interval", nargs=2, type=float,
default=[0, 0],
help="Interval at start and stop of GTIs used" +
" for filtering")
parser.add_argument("--pi-interval", type=int, default=[-1, -1],
nargs=2,
help="PI interval used for filtering")
parser.add_argument('-e', "--e-interval", type=float, default=[-1, -1],
nargs=2,
help="Energy interval used for filtering")
parser.add_argument("-s", "--scrunch",
help="Create scrunched light curve (single channel)",
default=False,
action="store_true")
parser.add_argument("-j", "--join",
help="Create joint light curve (multiple channels)",
default=False,
action="store_true")
parser.add_argument("-g", "--gti-split",
help="Split light curve by GTI",
default=False,
action="store_true")
parser.add_argument("--minlen",
help="Minimum length of acceptable GTIs (default:4)",
default=4, type=float)
parser.add_argument("--ignore-gtis",
help="Ignore GTIs",
default=False,
action="store_true")
parser.add_argument("-d", "--outdir", type=str, default=None,
help='Output directory')
parser.add_argument("-o", "--outfile", type=str, default=None,
help='Output file name')
parser.add_argument("--loglevel",
help=("use given logging level (one between INFO, "
"WARNING, ERROR, CRITICAL, DEBUG; "
"default:WARNING)"),
default='WARNING',
type=str)
parser.add_argument("--nproc",
help=("Number of processors to use"),
default=1,
type=int)
parser.add_argument("--debug", help="use DEBUG logging level",
default=False, action='store_true')
parser.add_argument("--noclobber", help="Do not overwrite existing files",
default=False, action='store_true')
parser.add_argument("--fits-input",
help="Input files are light curves in FITS format",
default=False, action='store_true')
parser.add_argument("--txt-input",
help="Input files are light curves in txt format",
default=False, action='store_true')
args = parser.parse_args(args)
if args.debug:
args.loglevel = 'DEBUG'
bintime = args.bintime
numeric_level = getattr(logging, args.loglevel.upper(), None)
logging.basicConfig(filename='MPlcurve.log', level=numeric_level,
filemode='w')
safe_interval = args.safe_interval
pi_interval = np.array(args.pi_interval)
e_interval = np.array(args.e_interval)
# ------ Use functools.partial to wrap lcurve* with relevant keywords---
if args.fits_input:
wrap_fun = _wrap_fits
argdict = {"noclobber": args.noclobber}
elif args.txt_input:
wrap_fun = _wrap_txt
argdict = {"noclobber": args.noclobber}
else:
wrap_fun = _wrap_lc
argdict = {"noclobber": args.noclobber, "safe_interval": safe_interval,
"pi_interval": pi_interval,
"e_interval": e_interval,
"min_length": args.minlen,
"gti_split": args.gti_split,
"ignore_gtis": args.ignore_gtis,
"bintime": bintime, "outdir": args.outdir}
arglist = [[f, argdict.copy()] for f in args.files]
na = len(arglist)
outfile = args.outfile
if outfile is not None:
outname = os.path.splitext(outfile)[0]
for i in range(na):
if na > 1:
outname = outfile + "_{0}".format(i)
arglist[i][1]['outfile'] = outname
# -------------------------------------------------------------------------
outfiles = []
if os.name == 'nt' or args.nproc == 1:
for a in arglist:
outfiles.append(wrap_fun(a))
else:
pool = Pool(processes=args.nproc)
for i in pool.imap_unordered(wrap_fun, arglist):
outfiles.append(i)
pool.close()
logging.debug(outfiles)
if args.scrunch:
scrunch_lightcurves(outfiles)
if args.join:
join_lightcurves(outfiles)
[docs]def scrunch_main(args=None):
"""Main function called by the `MPscrunchlc` command line script."""
import argparse
description = \
'Sum lightcurves from different instruments or energy ranges'
parser = argparse.ArgumentParser(description=description)
parser.add_argument("files", help="List of files", nargs='+')
parser.add_argument("-o", "--out", type=str,
default="out_scrlc" + MP_FILE_EXTENSION,
help='Output file')
parser.add_argument("--loglevel",
help=("use given logging level (one between INFO, "
"WARNING, ERROR, CRITICAL, DEBUG; "
"default:WARNING)"),
default='WARNING',
type=str)
parser.add_argument("--debug", help="use DEBUG logging level",
default=False, action='store_true')
args = parser.parse_args(args)
files = args.files
if args.debug:
args.loglevel = 'DEBUG'
numeric_level = getattr(logging, args.loglevel.upper(), None)
logging.basicConfig(filename='MPscrunchlc.log', level=numeric_level,
filemode='w')
scrunch_lightcurves(files, args.out)