Source code for maltpynt.exposure

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Calculate the exposure correction for light curves.

Only works for data taken in specific data modes of NuSTAR, where all events
are telemetered.
"""
from __future__ import (absolute_import, unicode_literals, division,
                        print_function)

import numpy as np
from .io import load_events_and_gtis
from .io import get_file_type, save_lcurve, MP_FILE_EXTENSION
from .base import create_gti_mask, mp_root, _assign_value_if_none
import logging
import warnings


[docs]def get_livetime_per_bin(times, events, priors, dt=None, gti=None): """Get the livetime in a series of time intervals. Parameters ---------- times : array-like The array of times to look at events : array-like A list of events, producing dead time priors : array-like The livetime before each event (as in the PRIOR column of unfiltered NuSTAR event files) Returns ------- livetime_array : array-like An array of the same length as times, containing the live time values Other Parameters ---------------- dt : float or array-like The width of the time bins of the time array. Can be a single float or an array of the same length as times gti : [[g0_0, g0_1], [g1_0, g1_1], ...] Good time intervals. Defaults to [[time[0] - dt[0]/2, time[-1] + dt[-1]/2]] """ assert len(events) == len(priors), \ "`events` and `priors` must be of the same length" dt = _assign_value_if_none(dt, np.median(np.diff(times))) try: len(dt) except: dt = dt + np.zeros(len(times)) # Floating point events, starting from events[0] ev_fl = np.array(events - events[0], dtype=np.float64) pr_fl = np.array(priors, dtype=np.float64) # Start of livetime livetime_starts = ev_fl - pr_fl # Time bin borders: start from half a bin before tstart, end half a bin # after tstop tbins = np.array( np.append(times - dt / 2, [times[-1] + dt[-1] / 2]) - events[0], dtype=np.float64) tbin_starts = tbins[:-1] # Filter points outside of range of light curve filter = (ev_fl > tbins[0]) & (livetime_starts < tbins[-1]) ev_fl = ev_fl[filter] pr_fl = pr_fl[filter] livetime_starts = livetime_starts[filter] livetime_array = np.zeros_like(times) # ------ Normalize priors at the start and end of light curve ---------- before_start = \ (livetime_starts < tbin_starts[0]) & (ev_fl > tbin_starts[0]) livetime_starts[before_start] = tbins[0] + 1e-9 pr_fl[before_start] = ev_fl[before_start] - livetime_starts[before_start] after_end = \ (livetime_starts < tbins[-1]) & (ev_fl > tbins[-1]) ev_fl[after_end] = tbins[-1] - 1e-9 pr_fl[after_end] = ev_fl[after_end] - livetime_starts[after_end] # ---------------------------------------------------------------------- # Find bins to which "livetime starts" and "events" belong lts_bin = np.searchsorted(tbin_starts, livetime_starts, 'right') - 1 ev_bin = np.searchsorted(tbin_starts, ev_fl, 'right') - 1 # First of all, just consider livetimes and events inside the same bin. first_pass = ev_bin == lts_bin expo, bins = np.histogram(ev_fl[first_pass], bins=tbins, weights=pr_fl[first_pass]) assert np.all(expo) >= 0, expo livetime_array += expo # Now, let's consider the case where livetime starts some bins before. # We start from the most distant (max_bin_diff) and we arrive to 1. max_bin_diff = np.max(ev_bin - lts_bin) for bin_diff in range(max_bin_diff, 0, -1): idxs = ev_bin == lts_bin + bin_diff # Filter only events relevant to this case ev_bin_good = ev_bin[idxs] lts_bin_good = lts_bin[idxs] ev_good = ev_fl[idxs] lt_good = livetime_starts[idxs] # find corresponding time bins e_idx = np.searchsorted(tbin_starts, ev_good, 'right') - 1 _tbins = tbin_starts[e_idx] livetime_array[ev_bin_good] += ev_good - _tbins assert np.all(ev_good - _tbins >= 0), \ "Invalid boundaries. Contact the developer: {}".format( ev_good - _tbins) l_idx = np.searchsorted(tbin_starts, lt_good, 'right') _tbins = tbin_starts[l_idx] livetime_array[lts_bin_good] += _tbins - lt_good assert np.all(_tbins - lt_good >= 0), \ "Invalid boundaries. Contact the developer: {}".format( _tbins - lt_good) # Complete bins if bin_diff > 1: for i in range(1, bin_diff): livetime_array[lts_bin_good + i] += \ dt[lts_bin_good + i] return livetime_array
def _plot_dead_time_from_uf(uf_file, outroot="expo"): import matplotlib.pyplot as plt from matplotlib.gridspec import GridSpec from numpy import histogram additional_columns = ["PRIOR", "PI", "SHIELD", "SHLD_T", "SHLD_HI"] data = load_events_and_gtis(uf_file, additional_columns=additional_columns) events = data.ev_list additional = data.additional_data priors = additional["PRIOR"] dead_times = np.diff(events) - priors[1:] shields = additional["SHIELD"][1:] shld_t = additional["SHLD_T"][1:] shld_hi = additional["SHLD_HI"][1:] bins = np.percentile(dead_times, np.linspace(0, 100, 1000)) hist_all, bins_all = histogram(dead_times, bins=bins, density=True) hist_shield, bins_shield = histogram(dead_times[shields > 0], bins=bins, density=True) hist_noshield, bins_noshield = histogram(dead_times[shields == 0], bins=bins, density=True) hist_shld_hi, bins_shld_hi = histogram(dead_times[shld_hi > 0], bins=bins, density=True) bin_centers = bins[:-1] + np.diff(bins) / 2 fig = plt.figure("Dead time distribution", figsize=(10, 10)) gs = GridSpec(2, 1, hspace=0) ax1 = plt.subplot(gs[0]) ax1.loglog(bin_centers, hist_all, drawstyle="steps-mid", label="all") ax1.loglog(bin_centers, hist_shield, drawstyle="steps-mid", label="shield") ax1.loglog(bin_centers, hist_shld_hi, drawstyle="steps-mid", label="shld_hi") ax1.loglog(bin_centers, hist_noshield, drawstyle="steps-mid", label="no shield") ax1.set_ylabel("Occurrences (arbitrary units)") ax1.legend() ax2 = plt.subplot(gs[1], sharex=ax1) for sht in set(shld_t[shld_t > 0]): hs, bs = histogram(dead_times[shld_t == sht], bins=bins, density=True) ax2.loglog(bin_centers, hs, drawstyle="steps-mid", label="shield time {}".format(sht)) ax2.set_xlabel("Dead time (s)") ax2.set_ylabel("Occurrences (arbitrary units)") ax2.legend() plt.draw() fig.savefig(outroot + "_deadt_distr.png")
[docs]def get_exposure_from_uf(time, uf_file, dt=None, gti=None): """Get livetime from unfiltered event file. Parameters ---------- time : array-like The time bins of the light curve uf_file : str Unfiltered event file (the one in the event_cl directory with the _uf suffix) Returns ------- expo : array-like Exposure (livetime) values corresponding to time bins Other Parameters ---------------- dt : float If time array is not sampled uniformly, dt can be specified here. """ dt = _assign_value_if_none(dt, np.median(np.diff(time))) additional_columns = ["PRIOR", "PI"] data = load_events_and_gtis(uf_file, additional_columns=additional_columns) events = data.ev_list additional = data.additional_data priors = additional["PRIOR"] # grade = additional["GRADE"] # pis = additional["PI"] # xs = additional["X"] # ys = additional["Y"] # # filt = (grade < 32) & (pis >= 0) & (x is not None) & (y is not None) expo = get_livetime_per_bin(time, events, priors, dt, gti=gti) return expo
def _plot_corrected_light_curve(time, lc, expo, gti=None, outroot="expo"): import matplotlib.pyplot as plt good = create_gti_mask(time, gti) fig = plt.figure("Exposure-corrected lc") plt.plot(time[good], expo[good] / np.max(expo) * np.max(lc[good]), label="Exposure (arbitrary units)", zorder=10) plt.plot(time[good], lc[good], label="Light curve", zorder=20) plt.plot(time[good], lc[good] / expo[good], label="Exposure-corrected Light curve") plt.legend() fig.savefig(outroot + "_corr_lc.png")
[docs]def correct_lightcurve(lc_file, uf_file, outname=None, expo_limit=1e-7): """Apply exposure correction to light curve. Parameters ---------- lc_file : str The light curve file, in MaLTPyNT format uf_file : str The unfiltered event file, in FITS format Returns ------- outdata : str Output data structure Other Parameters ---------------- outname : str Output file name """ outname = _assign_value_if_none( outname, mp_root(lc_file) + "_lccorr" + MP_FILE_EXTENSION) ftype, contents = get_file_type(lc_file) time = contents["time"] lc = contents["lc"] dt = contents["dt"] gti = contents["GTI"] expo = get_exposure_from_uf(time, uf_file, dt=dt, gti=gti) outdata = contents.copy() newlc = np.array(lc / expo * dt, dtype=np.float64) newlc[expo < expo_limit] = 0 outdata["lc"] = newlc outdata["expo"] = expo save_lcurve(outdata, outname) return outdata
[docs]def main(args=None): """Main function called by the `MPexposure` command line script.""" import argparse description = ( 'Create exposure light curve based on unfiltered event files.') parser = argparse.ArgumentParser(description=description) parser.add_argument("lcfile", help="Light curve file (MaltPyNT format)") parser.add_argument("uffile", help="Unfiltered event file (FITS)") parser.add_argument("-o", "--outroot", type=str, default=None, help='Root of output file names') parser.add_argument("--loglevel", help=("use given logging level (one between INFO, " "WARNING, ERROR, CRITICAL, DEBUG; " "default:WARNING)"), default='WARNING', type=str) parser.add_argument("--debug", help="use DEBUG logging level", default=False, action='store_true') parser.add_argument("--plot", help="Plot on window", default=False, action='store_true') args = parser.parse_args(args) if args.debug: args.loglevel = 'DEBUG' numeric_level = getattr(logging, args.loglevel.upper(), None) logging.basicConfig(filename='MPexposure.log', level=numeric_level, filemode='w') lc_file = args.lcfile uf_file = args.uffile outroot = _assign_value_if_none(args.outroot, mp_root(lc_file)) outname = outroot + "_lccorr" + MP_FILE_EXTENSION outdata = correct_lightcurve(lc_file, uf_file, outname) time = outdata["time"] lc = outdata["lc"] expo = outdata["expo"] gti = outdata["GTI"] try: _plot_corrected_light_curve(time, lc * expo, expo, gti, outroot) _plot_dead_time_from_uf(uf_file, outroot) except Exception as e: warnings.warn(str(e)) pass if args.plot: import matplotlib.pyplot as plt plt.show()