aboutsummaryrefslogtreecommitdiffstats
path: root/python/crosstalk_deprojection.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/crosstalk_deprojection.py')
-rwxr-xr-xpython/crosstalk_deprojection.py1726
1 files changed, 1726 insertions, 0 deletions
diff --git a/python/crosstalk_deprojection.py b/python/crosstalk_deprojection.py
new file mode 100755
index 0000000..dd099b4
--- /dev/null
+++ b/python/crosstalk_deprojection.py
@@ -0,0 +1,1726 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# References:
+# [1] Definition of RMF and ARF file formats
+# https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html
+# [2] CIAO: Auxiliary Response File
+# http://cxc.harvard.edu/ciao/dictionary/arf.html
+# [3] CIAO: Redistribution Matrix File
+# http://cxc.harvard.edu/ciao/dictionary/rmf.html
+# [4] astropy - FITS format code
+# http://docs.astropy.org/en/stable/io/fits/usage/table.html#column-creation
+# [5] XSPEC - Spectral Fitting
+# https://heasarc.gsfc.nasa.gov/docs/xanadu/xspec/manual/XspecSpectralFitting.html
+# [6] Direct X-ray Spectra Deprojection
+# https://www-xray.ast.cam.ac.uk/papers/dsdeproj/
+# Sanders & Fabian 2007, MNRAS, 381, 1381
+#
+#
+# Weitian LI
+# Created: 2016-03-26
+# Updated: 2016-04-19
+#
+# ChangeLog:
+# 2016-04-19:
+# * Ignore numpy error due to division by zero
+# * Update tool description and sample configuration
+# * Add two other main methods: `main_deprojection()' and `main_crosstalk()'
+# * Add argument 'group_squeeze' to some methods for better performance
+# * Rename from 'correct_crosstalk.py' to 'crosstalk_deprojection.py'
+# 2016-04-18:
+# * Implement deprojection function: class Deprojection
+# * Support spectral grouping (supply the grouping specification)
+# * Add grouping, estimate_errors, copy, randomize, etc. methods
+# * Utilize the Monte Carlo techniques to estimate the final spectral errors
+# * Collect all ARFs and RMFs within dictionaries
+# 2016-04-06:
+# * Fix `RMF: get_rmfimg()' for XMM EPIC RMF
+# 2016-04-02:
+# * Interpolate ARF in order to match the spectral channel energies
+# * Add version and date information
+# * Update documentations
+# * Update header history contents
+# 2016-04-01:
+# * Greatly update the documentations (e.g., description, sample config)
+# * Add class `RMF'
+# * Add method `get_energy()' for class `ARF'
+# * Split out class `SpectrumSet' from `Spectrum'
+# * Implement background subtraction
+# * Add config `subtract_bkg' and corresponding argument
+#
+# XXX/FIXME:
+# * Deprojection: account for ARF differences across different regions
+#
+# TODO:
+# * Split classes ARF, RMF, Spectrum, and SpectrumSet to a separate module
+#
+
+__version__ = "0.5.0"
+__date__ = "2016-04-19"
+
+
+"""
+Correct the crosstalk effect of XMM spectra by subtracting the photons
+that scattered from the surrounding regions due to the finite PSF, and
+by compensating the photons that scattered to the surrounding regions,
+according to the generated crosstalk ARFs by SAS `arfgen'.
+
+After the crosstalk effect being corrected, the deprojection is performed
+to deproject the crosstalk-corrected spectra to derive the spectra with
+both the crosstalk effect and projection effect corrected.
+
+
+Sample config file (in `ConfigObj' syntax):
+-----------------------------------------------------------
+# operation mode: deprojection, crosstalk, or both (default)
+mode = both
+# supply a *groupped* spectrum (from which the "GROUPING" and "QUALITY"
+# are used to group all the following spectra)
+grouping = spec_grp.pi
+# whether to subtract the background before crosstalk correction
+subtract_bkg = True
+# whether to fix the negative channel values due to spectral subtractions
+fix_negative = False
+# Monte Carlo times for spectral error estimation
+mc_times = 5000
+# show progress details and verbose information
+verbose = True
+# overwrite existing files
+clobber = False
+
+[reg1]
+...
+
+[reg2]
+outfile = deprojcc_reg2.pi
+spec = reg2.pi
+arf = reg2.arf
+rmf = reg2.rmf
+bkg = reg2_bkg.pi
+ [[cross_in]]
+ [[[in1]]]
+ spec = reg1.pi
+ arf = reg1.arf
+ rmf = reg1.rmf
+ bkg = reg1_bkg.pi
+ cross_arf = reg_1-2.arf
+ [[[in2]]]
+ spec = reg3.pi
+ arf = reg3.arf
+ rmf = reg3.rmf
+ bkg = reg3_bkg.pi
+ cross_arf = reg_3-2.arf
+ [[cross_out]]
+ cross_arf = reg_2-1.arf, reg_2-3.arf
+
+[...]
+...
+-----------------------------------------------------------
+"""
+
+WARNING = """
+********************************* WARNING ************************************
+The generated spectra are substantially modified (e.g., scale, add, subtract),
+therefore, take special care when interpretating the fitting results,
+especially the metal abundances and normalizations.
+******************************************************************************
+"""
+
+
+import sys
+import os
+import argparse
+from datetime import datetime
+from copy import copy
+
+import numpy as np
+import scipy as sp
+import scipy.interpolate
+from astropy.io import fits
+from configobj import ConfigObj
+
+
+def group_data(data, grouping):
+ """
+ Group the data with respect to the supplied `grouping' specification
+ (i.e., "GROUPING" columns of a spectrum). The channel counts of the
+ same group are summed up and assigned to the FIRST channel of this
+ group, while the OTHRE channels are all set to ZERO.
+ """
+ data_grp = np.array(data).copy()
+ for i in reversed(range(len(data))):
+ if grouping[i] == 1:
+ # the beginning channel of a group
+ continue
+ else:
+ # other channels of a group
+ data_grp[i-1] += data_grp[i]
+ data_grp[i] = 0
+ assert np.isclose(sum(data_grp), sum(data))
+ return data_grp
+
+
+class ARF: # {{{
+ """
+ Class to handle the ARF (ancillary/auxiliary response file),
+ which contains the combined instrumental effective area
+ (telescope/filter/detector) and the quantum efficiency (QE) as a
+ function of energy averaged over time.
+ The effective area is [cm^2], and the QE is [counts/photon]; they are
+ multiplied together to create the ARF, resulting in [cm^2 counts/photon].
+
+ **CAVEAT/NOTE**:
+ Generally, the "ENERG_LO" and "ENERG_HI" columns of an ARF are *different*
+ to the "E_MIN" and "E_MAX" columns of a RMF (which are corresponding
+ to the spectrum channel energies).
+ For the XMM EPIC *pn* and Chandra *ACIS*, the generated ARF does NOT have
+ the same number of data points to that of spectral channels, i.e., the
+ "ENERG_LO" and "ENERG_HI" columns of ARF is different to the "E_MIN" and
+ "E_MAX" columns of RMF.
+ Therefore it is necessary to interpolate and extrapolate the ARF curve
+ in order to match the spectrum (or RMF "EBOUNDS" extension).
+ As for the XMM EPIC *MOS1* and *MOS2*, the ARF data points match the
+ spectral channels, i.e., the energy positions of each ARF data point and
+ spectral channel are consistent. Thus the interpolation is not needed.
+
+ References:
+ [1] CIAO: Auxiliary Response File
+ http://cxc.harvard.edu/ciao/dictionary/arf.html
+ [2] Definition of RMF and ARF file formats
+ https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html
+ """
+ filename = None
+ fitsobj = None
+ # only consider the "SPECTRUM" extension
+ header = None
+ energ_lo = None
+ energ_hi = None
+ specresp = None
+ # function of the interpolated ARF
+ f_interp = None
+ # energies of the spectral channels
+ energy_channel = None
+ # spectral channel grouping specification
+ grouping = None
+ groupped = False
+ # groupped ARF channels with respect to the grouping
+ specresp_grp = None
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.fitsobj = fits.open(filename)
+ ext_specresp = self.fitsobj["SPECRESP"]
+ self.header = ext_specresp.header
+ self.energ_lo = ext_specresp.data["ENERG_LO"]
+ self.energ_hi = ext_specresp.data["ENERG_HI"]
+ self.specresp = ext_specresp.data["SPECRESP"]
+
+ def get_data(self, groupped=False, group_squeeze=False, copy=True):
+ if groupped:
+ specresp = self.specresp_grp
+ if group_squeeze:
+ specresp = specresp[self.grouping == 1]
+ else:
+ specresp = self.specresp
+ if copy:
+ return specresp.copy()
+ else:
+ return specresp
+
+ def get_energy(self, mean="geometric"):
+ """
+ Return the mean energy values of the ARF.
+
+ Arguments:
+ * mean: type of the mean energy:
+ + "geometric": geometric mean, i.e., e = sqrt(e_min*e_max)
+ + "arithmetic": arithmetic mean, i.e., e = 0.5*(e_min+e_max)
+ """
+ if mean == "geometric":
+ energy = np.sqrt(self.energ_lo * self.energ_hi)
+ elif mean == "arithmetic":
+ energy = 0.5 * (self.energ_lo + self.energ_hi)
+ else:
+ raise ValueError("Invalid mean type: %s" % mean)
+ return energy
+
+ def interpolate(self, x=None, verbose=False):
+ """
+ Cubic interpolate the ARF curve using `scipy.interpolate'
+
+ If the requested point is outside of the data range, the
+ fill value of *zero* is returned.
+
+ Arguments:
+ * x: points at which the interpolation to be calculated.
+
+ Return:
+ If x is None, then the interpolated function is returned,
+ otherwise, the interpolated data are returned.
+ """
+ if not hasattr(self, "f_interp") or self.f_interp is None:
+ energy = self.get_energy()
+ arf = self.get_data(copy=False)
+ if verbose:
+ print("INFO: interpolating '%s' (this may take a while) ..." \
+ % self.filename, file=sys.stderr)
+ f_interp = sp.interpolate.interp1d(energy, arf, kind="cubic",
+ bounds_error=False, fill_value=0.0, assume_sorted=True)
+ self.f_interp = f_interp
+ if x is not None:
+ return self.f_interp(x)
+ else:
+ return self.f_interp
+
+ def apply_grouping(self, energy_channel, grouping, verbose=False):
+ """
+ Group the ARF channels (INTERPOLATED with respect to the spectral
+ channels) by the supplied grouping specification.
+
+ Arguments:
+ * energy_channel: energies of the spectral channel
+ * grouping: spectral grouping specification
+
+ Return: `self.specresp_grp'
+ """
+ if self.groupped:
+ return
+ if verbose:
+ print("INFO: Grouping spectrum '%s' ..." % self.filename,
+ file=sys.stderr)
+ self.energy_channel = energy_channel
+ self.grouping = grouping
+ # interpolate the ARF w.r.t the spectral channel energies
+ arf_interp = self.interpolate(x=energy_channel, verbose=verbose)
+ self.specresp_grp = group_data(arf_interp, grouping)
+ self.groupped = True
+# class ARF }}}
+
+
+class RMF: # {{{
+ """
+ Class to handle the RMF (redistribution matrix file),
+ which maps from energy space into detector pulse height (or position)
+ space. Since detectors are not perfect, this involves a spreading of
+ the observed counts by the detector resolution, which is expressed as
+ a matrix multiplication.
+ For X-ray spectral analysis, the RMF encodes the probability R(E,p)
+ that a detected photon of energy E will be assisgned to a given
+ channel value (PHA or PI) of p.
+
+ The standard Legacy format [2] for the RMF uses a binary table in which
+ each row contains R(E,p) for a single value of E as a function of p.
+ Non-zero sequences of elements of R(E,p) are encoded using a set of
+ variable length array columns. This format is compact but hard to
+ manipulate and understand.
+
+ **CAVEAT/NOTE**:
+ + See also the above ARF CAVEAT/NOTE.
+ + The "EBOUNDS" extension contains the `CHANNEL', `E_MIN' and `E_MAX'
+ columns. This `CHANNEL' is the same as that of a spectrum. Therefore,
+ the energy values determined from the `E_MIN' and `E_MAX' columns are
+ used to interpolate and extrapolate the ARF curve.
+ + The `ENERG_LO' and `ENERG_HI' columns of the "MATRIX" extension are
+ the same as that of a ARF.
+
+ References:
+ [1] CIAO: Redistribution Matrix File
+ http://cxc.harvard.edu/ciao/dictionary/rmf.html
+ [2] Definition of RMF and ARF file formats
+ https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html
+ """
+ filename = None
+ fitsobj = None
+ ## extension "MATRIX"
+ hdr_matrix = None
+ energ_lo = None
+ energ_hi = None
+ n_grp = None
+ f_chan = None
+ n_chan = None
+ # raw squeezed RMF matrix data
+ matrix = None
+ ## extension "EBOUNDS"
+ hdr_ebounds = None
+ channel = None
+ e_min = None
+ e_max = None
+ ## converted 2D RMF matrix/image from the squeezed binary table
+ # size: len(energ_lo) x len(channel)
+ rmfimg = None
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.fitsobj = fits.open(filename)
+ ## "MATRIX" extension
+ ext_matrix = self.fitsobj["MATRIX"]
+ self.hdr_matrix = ext_matrix.header
+ self.energ_lo = ext_matrix.data["ENERG_LO"]
+ self.energ_hi = ext_matrix.data["ENERG_HI"]
+ self.n_grp = ext_matrix.data["N_GRP"]
+ self.f_chan = ext_matrix.data["F_CHAN"]
+ self.n_chan = ext_matrix.data["N_CHAN"]
+ self.matrix = ext_matrix.data["MATRIX"]
+ ## "EBOUNDS" extension
+ ext_ebounds = self.fitsobj["EBOUNDS"]
+ self.hdr_ebounds = ext_ebounds.header
+ self.channel = ext_ebounds.data["CHANNEL"]
+ self.e_min = ext_ebounds.data["E_MIN"]
+ self.e_max = ext_ebounds.data["E_MAX"]
+
+ def get_energy(self, mean="geometric"):
+ """
+ Return the mean energy values of the RMF "EBOUNDS".
+
+ Arguments:
+ * mean: type of the mean energy:
+ + "geometric": geometric mean, i.e., e = sqrt(e_min*e_max)
+ + "arithmetic": arithmetic mean, i.e., e = 0.5*(e_min+e_max)
+ """
+ if mean == "geometric":
+ energy = np.sqrt(self.e_min * self.e_max)
+ elif mean == "arithmetic":
+ energy = 0.5 * (self.e_min + self.e_max)
+ else:
+ raise ValueError("Invalid mean type: %s" % mean)
+ return energy
+
+ def get_rmfimg(self):
+ """
+ Convert the RMF data in squeezed binary table (standard Legacy format)
+ to a 2D image/matrix.
+ """
+ def _make_rmfimg_row(n_channel, dtype, f_chan, n_chan, mat_row):
+ # make sure that `f_chan' and `n_chan' are 1-D numpy array
+ f_chan = np.array(f_chan).reshape(-1)
+ f_chan -= 1 # FITS indices are 1-based
+ n_chan = np.array(n_chan).reshape(-1)
+ idx = np.concatenate([ np.arange(f, f+n) \
+ for f, n in zip(f_chan, n_chan) ])
+ rmfrow = np.zeros(n_channel, dtype=dtype)
+ rmfrow[idx] = mat_row
+ return rmfrow
+ #
+ if self.rmfimg is None:
+ # Make the 2D RMF matrix/image
+ n_energy = len(self.energ_lo)
+ n_channel = len(self.channel)
+ rmf_dtype = self.matrix[0].dtype
+ rmfimg = np.zeros(shape=(n_energy, n_channel), dtype=rmf_dtype)
+ for i in np.arange(n_energy)[self.n_grp > 0]:
+ rmfimg[i, :] = _make_rmfimg_row(n_channel, rmf_dtype,
+ self.f_chan[i], self.n_chan[i], self.matrix[i])
+ self.rmfimg = rmfimg
+ return self.rmfimg
+
+ def write_rmfimg(self, outfile, clobber=False):
+ rmfimg = self.get_rmfimg()
+ # merge headers
+ header = self.hdr_matrix.copy(strip=True)
+ header.extend(self.hdr_ebounds.copy(strip=True))
+ outfits = fits.PrimaryHDU(data=rmfimg, header=header)
+ outfits.writeto(outfile, checksum=True, clobber=clobber)
+# class RMF }}}
+
+
+class Spectrum: # {{{
+ """
+ Class that deals with the X-ray spectrum file (usually *.pi).
+ """
+ filename = None
+ # FITS object return by `fits.open()'
+ fitsobj = None
+ # header of "SPECTRUM" extension
+ header = None
+ # "SPECTRUM" extension data
+ channel = None
+ # name of the spectrum data column (i.e., type, "COUNTS" or "RATE")
+ spec_type = None
+ # unit of the spectrum data ("count" for "COUNTS", "count/s" for "RATE")
+ spec_unit = None
+ # spectrum data
+ spec_data = None
+ # estimated spectral errors for each channel/group
+ spec_err = None
+ # statistical errors for each channel/group
+ stat_err = None
+ # grouping and quality
+ grouping = None
+ quality = None
+ # whether the spectral data being groupped
+ groupped = False
+ # several important keywords
+ EXPOSURE = None
+ BACKSCAL = None
+ RESPFILE = None
+ ANCRFILE = None
+ BACKFILE = None
+ # numpy dtype and FITS format code of the spectrum data
+ spec_dtype = None
+ spec_fits_format = None
+ # output filename for writing the spectrum if no filename provided
+ outfile = None
+
+ def __init__(self, filename, outfile=None):
+ self.filename = filename
+ self.fitsobj = fits.open(filename)
+ ext_spec = self.fitsobj["SPECTRUM"]
+ self.header = ext_spec.header.copy(strip=True)
+ colnames = ext_spec.columns.names
+ if "COUNTS" in colnames:
+ self.spec_type = "COUNTS"
+ elif "RATE" in colnames:
+ self.spec_type = "RATE"
+ else:
+ raise ValueError("Invalid spectrum file")
+ self.channel = ext_spec.data.columns["CHANNEL"].array
+ col_spec_data = ext_spec.data.columns[self.spec_type]
+ self.spec_data = col_spec_data.array.copy()
+ self.spec_unit = col_spec_data.unit
+ self.spec_dtype = col_spec_data.dtype
+ self.spec_fits_format = col_spec_data.format
+ # grouping and quality
+ if "GROUPING" in colnames:
+ self.grouping = ext_spec.data.columns["GROUPING"].array
+ if "QUALITY" in colnames:
+ self.quality = ext_spec.data.columns["QUALITY"].array
+ # keywords
+ self.EXPOSURE = self.header.get("EXPOSURE")
+ self.BACKSCAL = self.header.get("BACKSCAL")
+ self.AREASCAL = self.header.get("AREASCAL")
+ self.RESPFILE = self.header.get("RESPFILE")
+ self.ANCRFILE = self.header.get("ANCRFILE")
+ self.BACKFILE = self.header.get("BACKFILE")
+ # output filename
+ self.outfile = outfile
+
+ def get_data(self, group_squeeze=False, copy=True):
+ """
+ Get the spectral data (i.e., self.spec_data).
+
+ Arguments:
+ * group_squeeze: whether squeeze the spectral data according to
+ the grouping (i.e., exclude the channels that
+ are not the first channel of the group, which
+ also have value of ZERO).
+ This argument is effective only the grouping
+ being applied.
+ """
+ if group_squeeze and self.groupped:
+ spec_data = self.spec_data[self.grouping == 1]
+ else:
+ spec_data = self.spec_data
+ if copy:
+ return spec_data.copy()
+ else:
+ return spec_data
+
+ def get_channel(self, copy=True):
+ if copy:
+ return self.channel.copy()
+ else:
+ return self.channel
+
+ def set_data(self, spec_data, group_squeeze=True):
+ """
+ Set the spectral data of this spectrum to the supplied data.
+ """
+ if group_squeeze and self.groupped:
+ assert sum(self.grouping == 1) == len(spec_data)
+ self.spec_data[self.grouping == 1] = spec_data
+ else:
+ assert len(self.spec_data) == len(spec_data)
+ self.spec_data = spec_data.copy()
+
+ def add_stat_err(self, stat_err, group_squeeze=True):
+ """
+ Add the "STAT_ERR" column as the statistical errors of each spectral
+ group, which are estimated by utilizing the Monte Carlo techniques.
+ """
+ self.stat_err = np.zeros(self.spec_data.shape,
+ dtype=self.spec_data.dtype)
+ if group_squeeze and self.groupped:
+ assert sum(self.grouping == 1) == len(stat_err)
+ self.stat_err[self.grouping == 1] = stat_err
+ else:
+ assert len(self.stat_err) == len(stat_err)
+ self.stat_err = stat_err.copy()
+ self.header["POISSERR"] = False
+
+ def apply_grouping(self, grouping=None, quality=None):
+ """
+ Apply the spectral channel grouping specification to the spectrum.
+
+ NOTE:
+ * The spectral data (i.e., self.spec_data) is MODIFIED!
+ * The spectral data within the same group are summed up.
+ * The self grouping is overwritten if `grouping' is supplied, as well
+ as the self quality.
+ """
+ if grouping is not None:
+ self.grouping = grouping
+ if quality is not None:
+ self.quality = quality
+ self.spec_data = group_data(self.spec_data, self.grouping)
+ self.groupped = True
+
+ def estimate_errors(self, gehrels=True):
+ """
+ Estimate the statistical errors of each spectral group (after
+ applying grouping) for the source spectrum (and background spectrum).
+
+ If `gehrels=True', the statistical error for a spectral group with
+ N photons is given by `1 + sqrt(N + 0.75)'; otherwise, the error
+ is given by `sqrt(N)'.
+
+ Results: `self.spec_err'
+ """
+ eps = 1.0e-10
+ if gehrels:
+ self.spec_err = 1.0 + np.sqrt(self.spec_data + 0.75)
+ else:
+ self.spec_err = np.sqrt(self.spec_data)
+ # replace the zeros with a very small value (because
+ # `np.random.normal' requires `scale' > 0)
+ self.spec_err[self.spec_err <= 0.0] = eps
+
+ def copy(self):
+ """
+ Return a copy of this object, with the `np.ndarray' properties are
+ copied.
+ """
+ new = copy(self)
+ for k, v in self.__dict__.items():
+ if isinstance(v, np.ndarray):
+ setattr(new, k, v.copy())
+ return new
+
+ def randomize(self):
+ """
+ Randomize the spectral data according to the estimated spectral
+ group errors by assuming the normal distribution.
+
+ NOTE: this method should be called AFTER the `copy()' method.
+ """
+ if self.spec_err is None:
+ raise ValueError("No valid 'spec_err' presents")
+ if self.groupped:
+ idx = self.grouping == 1
+ self.spec_data[idx] = np.random.normal(self.spec_data[idx],
+ self.spec_err[idx])
+ else:
+ self.spec_data = np.random.normal(self.spec_data, self.spec_err)
+ return self
+
+ def reset_header_keywords(self,
+ keywords=["ANCRFILE", "RESPFILE", "BACKFILE"]):
+ """
+ Reset the keywords to "NONE" to avoid confusion or mistakes.
+ """
+ for kw in keywords:
+ if kw in self.header:
+ self.header[kw] = "NONE"
+
+ def write(self, filename=None, clobber=False):
+ """
+ Create a new "SPECTRUM" table/extension and replace the original
+ one, then write to output file.
+ """
+ if filename is None:
+ filename = self.outfile
+ columns = [
+ fits.Column(name="CHANNEL", format="I", array=self.channel),
+ fits.Column(name=self.spec_type, format=self.spec_fits_format,
+ unit=self.spec_unit, array=self.spec_data),
+ ]
+ if self.grouping is not None:
+ columns.append(fits.Column(name="GROUPING",
+ format="I", array=self.grouping))
+ if self.quality is not None:
+ columns.append(fits.Column(name="QUALITY",
+ format="I", array=self.quality))
+ if self.stat_err is not None:
+ columns.append(fits.Column(name="STAT_ERR", unit=self.spec_unit,
+ format=self.spec_fits_format,
+ array=self.stat_err))
+ ext_spec_cols = fits.ColDefs(columns)
+ ext_spec = fits.BinTableHDU.from_columns(ext_spec_cols,
+ header=self.header)
+ self.fitsobj["SPECTRUM"] = ext_spec
+ self.fitsobj.writeto(filename, clobber=clobber, checksum=True)
+# class Spectrum }}}
+
+
+class SpectrumSet(Spectrum): # {{{
+ """
+ This class handles a set of spectrum, including the source spectrum,
+ RMF, ARF, and the background spectrum.
+
+ **NOTE**:
+ The "COUNTS" column data are converted from "int32" to "float32",
+ since this spectrum will be subtracted/compensated according to the
+ ratios of ARFs.
+ """
+ # ARF object for this spectrum
+ arf = None
+ # RMF object for this spectrum
+ rmf = None
+ # background Spectrum object for this spectrum
+ bkg = None
+ # inner and outer radius of the region from which the spectrum extracted
+ radius_inner = None
+ radius_outer = None
+ # total angular range of the spectral region
+ angle = None
+
+ # numpy dtype and FITS format code to which the spectrum data be
+ # converted if the data is "COUNTS"
+ #_spec_dtype = np.float32
+ #_spec_fits_format = "E"
+ _spec_dtype = np.float64
+ _spec_fits_format = "D"
+
+ def __init__(self, filename, outfile=None, arf=None, rmf=None, bkg=None):
+ super().__init__(filename, outfile)
+ # convert spectrum data type if necessary
+ if self.spec_data.dtype != self._spec_dtype:
+ self.spec_data = self.spec_data.astype(self._spec_dtype)
+ self.spec_dtype = self._spec_dtype
+ self.spec_fits_format = self._spec_fits_format
+ if arf is not None:
+ if isinstance(arf, ARF):
+ self.arf = arf
+ else:
+ self.arf = ARF(arf)
+ if rmf is not None:
+ if isinstance(rmf, RMF):
+ self.rmf = rmf
+ else:
+ self.rmf = RMF(rmf)
+ if bkg is not None:
+ if isinstance(bkg, Spectrum):
+ self.bkg = bkg
+ else:
+ self.bkg = Spectrum(bkg)
+ # convert background spectrum data type if necessary
+ if self.bkg.spec_data.dtype != self._spec_dtype:
+ self.bkg.spec_data = self.bkg.spec_data.astype(self._spec_dtype)
+ self.bkg.spec_dtype = self._spec_dtype
+ self.bkg.spec_fits_format = self._spec_fits_format
+
+ def get_energy(self, mean="geometric"):
+ """
+ Get the energy values of each channel if RMF present.
+
+ NOTE:
+ The "E_MIN" and "E_MAX" columns of the RMF is required to calculate
+ the spectrum channel energies.
+ And the channel energies are generally different to the "ENERG_LO"
+ and "ENERG_HI" of the corresponding ARF.
+ """
+ if self.rmf is None:
+ return None
+ else:
+ return self.rmf.get_energy(mean=mean)
+
+ def get_arf(self, mean="geometric", groupped=True, copy=True):
+ """
+ Get the interpolated ARF data w.r.t the spectral channel energies
+ if the ARF presents.
+
+ Arguments:
+ * groupped: (bool) whether to get the groupped ARF
+
+ Return: (groupped) interpolated ARF data
+ """
+ if self.arf is None:
+ return None
+ else:
+ return self.arf.get_data(groupped=groupped, copy=copy)
+
+ def read_xflt(self):
+ """
+ Read the XFLT000# keywords from the header, check the validity (e.g.,
+ "XFLT0001" should equals "XFLT0002", "XFLT0003" should equals 0).
+ Sum all the additional XFLT000# pairs (e.g., ) which describes the
+ regions angluar ranges.
+ """
+ eps = 1.0e-6
+ xflt0001 = float(self.header["XFLT0001"])
+ xflt0002 = float(self.header["XFLT0002"])
+ xflt0003 = float(self.header["XFLT0003"])
+ # XFLT000# validity check
+ assert np.isclose(xflt0001, xflt0002)
+ assert abs(xflt0003) < eps
+ # outer radius of the region
+ self.radius_outer = xflt0001
+ # angular regions
+ self.angle = 0.0
+ num = 4
+ while True:
+ try:
+ angle_begin = float(self.header["XFLT%04d" % num])
+ angle_end = float(self.header["XFLT%04d" % (num+1)])
+ num += 2
+ except KeyError:
+ break
+ self.angle += (angle_end - angle_begin)
+ # if NO additional XFLT000# keys exist, assume "annulus" region
+ if self.angle < eps:
+ self.angle = 360.0
+
+ def scale(self):
+ """
+ Scale the spectral data (and spectral group errors if present) of
+ the source spectrum (and background spectra if present) according
+ to the region angular size to make it correspond to the whole annulus
+ region (i.e., 360 degrees).
+
+ NOTE: the spectral data and errors (i.e., `self.spec_data', and
+ `self.spec_err') is MODIFIED!
+ """
+ self.spec_data *= (360.0 / self.angle)
+ if self.spec_err is not None:
+ self.spec_err *= (360.0 / self.angle)
+ # also scale the background spectrum if present
+ if self.bkg:
+ self.bkg.spec_data *= (360.0 / self.angle)
+ if self.bkg.spec_err is not None:
+ self.bkg.spec_err *= (360.0 / self.angle)
+
+ def apply_grouping(self, grouping=None, quality=None, verbose=False):
+ """
+ Apply the spectral channel grouping specification to the source
+ spectrum, the ARF (which is used during the later spectral
+ manipulations), and the background spectrum (if presents).
+
+ NOTE:
+ * The spectral data (i.e., self.spec_data) is MODIFIED!
+ * The spectral data within the same group are summed up.
+ * The self grouping is overwritten if `grouping' is supplied, as well
+ as the self quality.
+ """
+ super().apply_grouping(grouping=grouping, quality=quality)
+ # also group the ARF accordingly
+ self.arf.apply_grouping(energy_channel=self.get_energy(),
+ grouping=self.grouping, verbose=verbose)
+ # group the background spectrum if present
+ if self.bkg:
+ self.bkg.spec_data = group_data(self.bkg.spec_data, self.grouping)
+
+ def estimate_errors(self, gehrels=True):
+ """
+ Estimate the statistical errors of each spectral group (after
+ applying grouping) for the source spectrum (and background spectrum).
+
+ If `gehrels=True', the statistical error for a spectral group with
+ N photons is given by `1 + sqrt(N + 0.75)'; otherwise, the error
+ is given by `sqrt(N)'.
+
+ Results: `self.spec_err' (and `self.bkg.spec_err')
+ """
+ super().estimate_errors(gehrels=gehrels)
+ eps = 1.0e-10
+ # estimate the errors for background spectrum if present
+ if self.bkg:
+ if gehrels:
+ self.bkg.spec_err = 1.0 + np.sqrt(self.bkg.spec_data + 0.75)
+ else:
+ self.bkg.spec_err = np.sqrt(self.bkg.spec_data)
+ self.bkg.spec_err[self.bkg.spec_err <= 0.0] = eps
+
+ def subtract_bkg(self, inplace=True, verbose=False):
+ """
+ Subtract the background contribution from the source spectrum.
+ The `EXPOSURE' and `BACKSCAL' values are required to calculate
+ the fraction/ratio for the background subtraction.
+
+ Arguments:
+ * inplace: whether replace the `spec_data' with the background-
+ subtracted spectrum data; If True, the attribute
+ `spec_bkg_subtracted' is also set to `True' when
+ the subtraction finished.
+ The keywords "BACKSCAL" and "AREASCAL" are set to 1.0.
+
+ Return:
+ background-subtracted spectrum data
+ """
+ ratio = (self.EXPOSURE / self.bkg.EXPOSURE) * \
+ (self.BACKSCAL / self.bkg.BACKSCAL) * \
+ (self.AREASCAL / self.bkg.AREASCAL)
+ operation = " SUBTRACT_BACKGROUND: %s - %s * %s" % \
+ (self.filename, ratio, self.bkg.filename)
+ if verbose:
+ print(operation, file=sys.stderr)
+ spec_data_subbkg = self.spec_data - ratio * self.bkg.get_data()
+ if inplace:
+ self.spec_data = spec_data_subbkg
+ self.spec_bkg_subtracted = True
+ self.BACKSCAL = 1.0
+ self.AREASCAL = 1.0
+ # also record history
+ self.header.add_history(operation)
+ return spec_data_subbkg
+
+ def subtract(self, spectrumset, cross_arf, groupped=False,
+ group_squeeze=False, verbose=False):
+ """
+ Subtract the photons that originate from the surrounding regions
+ but were scattered into this spectrum due to the finite PSF.
+
+ The background of this spectrum and the given spectrum should
+ both be subtracted before applying this subtraction for crosstalk
+ correction, as well as the below `compensate()' procedure.
+
+ NOTE:
+ 1. The crosstalk ARF must be provided, since the `spectrumset.arf'
+ is required to be its ARF without taking crosstalk into account:
+ spec1_new = spec1 - spec2 * (cross_arf_2_to_1 / arf2)
+ 2. The ARF are interpolated to match the energies of spetral channels.
+ """
+ operation = " SUBTRACT: %s - (%s/%s) * %s" % (self.filename,
+ cross_arf.filename, spectrumset.arf.filename,
+ spectrumset.filename)
+ if verbose:
+ print(operation, file=sys.stderr)
+ energy = self.get_energy()
+ if groupped:
+ spectrumset.arf.apply_grouping(energy_channel=energy,
+ grouping=self.grouping, verbose=verbose)
+ cross_arf.apply_grouping(energy_channel=energy,
+ grouping=self.grouping, verbose=verbose)
+ arfresp_spec = spectrumset.arf.get_data(groupped=True,
+ group_squeeze=group_squeeze)
+ arfresp_cross = cross_arf.get_data(groupped=True,
+ group_squeeze=group_squeeze)
+ else:
+ arfresp_spec = spectrumset.arf.interpolate(x=energy,
+ verbose=verbose)
+ arfresp_cross = cross_arf.interpolate(x=energy, verbose=verbose)
+ with np.errstate(divide="ignore", invalid="ignore"):
+ arf_ratio = arfresp_cross / arfresp_spec
+ # fix nan/inf values due to division by zero
+ arf_ratio[ ~ np.isfinite(arf_ratio) ] = 0.0
+ spec_data = self.get_data(group_squeeze=group_squeeze) - \
+ spectrumset.get_data(group_squeeze=group_squeeze)*arf_ratio
+ self.set_data(spec_data, group_squeeze=group_squeeze)
+ # record history
+ self.header.add_history(operation)
+
+ def compensate(self, cross_arf, groupped=False, group_squeeze=False,
+ verbose=False):
+ """
+ Compensate the photons that originate from this regions but were
+ scattered into the surrounding regions due to the finite PSF.
+
+ formula:
+ spec1_new = spec1 + spec1 * (cross_arf_1_to_2 / arf1)
+ """
+ operation = " COMPENSATE: %s + (%s/%s) * %s" % (self.filename,
+ cross_arf.filename, self.arf.filename, self.filename)
+ if verbose:
+ print(operation, file=sys.stderr)
+ energy = self.get_energy()
+ if groupped:
+ cross_arf.apply_grouping(energy_channel=energy,
+ grouping=self.grouping, verbose=verbose)
+ arfresp_this = self.arf.get_data(groupped=True,
+ group_squeeze=group_squeeze)
+ arfresp_cross = cross_arf.get_data(groupped=True,
+ group_squeeze=group_squeeze)
+ else:
+ arfresp_this = self.arf.interpolate(x=energy, verbose=verbose)
+ arfresp_cross = cross_arf.interpolate(x=energy, verbose=verbose)
+ with np.errstate(divide="ignore", invalid="ignore"):
+ arf_ratio = arfresp_cross / arfresp_this
+ # fix nan/inf values due to division by zero
+ arf_ratio[ ~ np.isfinite(arf_ratio) ] = 0.0
+ spec_data = self.get_data(group_squeeze=group_squeeze) + \
+ self.get_data(group_squeeze=group_squeeze) * arf_ratio
+ self.set_data(spec_data, group_squeeze=group_squeeze)
+ # record history
+ self.header.add_history(operation)
+
+ def fix_negative(self, verbose=False):
+ """
+ The subtractions may lead to negative counts, it may be necessary
+ to fix these channels with negative values.
+ """
+ neg_counts = self.spec_data < 0
+ N = len(neg_counts)
+ neg_channels = np.arange(N, dtype=np.int)[neg_counts]
+ if len(neg_channels) > 0:
+ print("WARNING: %d channels have NEGATIVE counts" % \
+ len(neg_channels), file=sys.stderr)
+ i = 0
+ while len(neg_channels) > 0:
+ i += 1
+ if verbose:
+ if i == 1:
+ print("*** Fixing negative channels: iter %d..." % i,
+ end="", file=sys.stderr)
+ else:
+ print("%d..." % i, end="", file=sys.stderr)
+ for ch in neg_channels:
+ neg_val = self.spec_data[ch]
+ if ch < N-2:
+ self.spec_data[ch] = 0
+ self.spec_data[(ch+1):(ch+3)] -= 0.5 * np.abs(neg_val)
+ else:
+ # just set to zero if it is the last 2 channels
+ self.spec_data[ch] = 0
+ # update negative channels indices
+ neg_counts = self.spec_data < 0
+ neg_channels = np.arange(N, dtype=np.int)[neg_counts]
+ if i > 0:
+ print("FIXED!", file=sys.stderr)
+ # record history
+ self.header.add_history(" FIXED NEGATIVE CHANNELS")
+
+ def set_radius_inner(self, radius_inner):
+ """
+ Set the inner radius of the spectral region.
+ """
+ assert radius_inner < self.radius_outer
+ self.radius_inner = radius_inner
+
+ def copy(self):
+ """
+ Return a copy of this object.
+ """
+ new = super().copy()
+ if self.bkg:
+ new.bkg = self.bkg.copy()
+ return new
+
+ def randomize(self):
+ """
+ Randomize the source (and background if present) spectral data
+ according to the estimated spectral group errors by assuming the
+ normal distribution.
+
+ NOTE: this method should be called AFTER the `copy()' method.
+ """
+ super().randomize()
+ if self.bkg:
+ self.bkg.spec_data = np.random.normal(self.bkg.spec_data,
+ self.bkg.spec_err)
+ self.bkg.spec_data[self.grouping == -1] = 0.0
+ return self
+# class SpectrumSet }}}
+
+
+class Crosstalk: # {{{
+ """
+ XMM-Newton PSF Crosstalk effect correction.
+ """
+ # `SpectrumSet' object for the spectrum to be corrected
+ spectrumset = None
+ # NOTE/XXX: do NOT use list (e.g., []) here, otherwise, all the
+ # instances will share these list properties.
+ # `SpectrumSet' and `ARF' objects corresponding to the spectra from
+ # which the photons were scattered into this spectrum.
+ cross_in_specset = None
+ cross_in_arf = None
+ # `ARF' objects corresponding to the regions to which the photons of
+ # this spectrum were scattered into.
+ cross_out_arf = None
+ # grouping specification and quality data
+ grouping = None
+ quality = None
+ # whether the spectrum is groupped
+ groupped = False
+
+ def __init__(self, config, arf_dict={}, rmf_dict={},
+ grouping=None, quality=None):
+ """
+ Arguments:
+ * config: a section of the whole config file (`ConfigObj' object)
+ """
+ self.cross_in_specset = []
+ self.cross_in_arf = []
+ self.cross_out_arf = []
+ # this spectrum to be corrected
+ self.spectrumset = SpectrumSet(filename=config["spec"],
+ outfile=config["outfile"],
+ arf=arf_dict.get(config["arf"], config["arf"]),
+ rmf=rmf_dict.get(config.get("rmf"), config.get("rmf")),
+ bkg=config.get("bkg"))
+ # spectra and cross arf from which photons were scattered in
+ for reg_in in config["cross_in"].values():
+ specset = SpectrumSet(filename=reg_in["spec"],
+ arf=arf_dict.get(reg_in["arf"], reg_in["arf"]),
+ rmf=rmf_dict.get(reg_in.get("rmf"), reg_in.get("rmf")),
+ bkg=reg_in.get("bkg"))
+ self.cross_in_specset.append(specset)
+ self.cross_in_arf.append(arf_dict.get(reg_in["cross_arf"],
+ ARF(reg_in["cross_arf"])))
+ # regions into which the photons of this spectrum were scattered into
+ if "cross_out" in config.sections:
+ cross_arf = config["cross_out"].as_list("cross_arf")
+ for arffile in cross_arf:
+ self.cross_out_arf.append(arf_dict.get(arffile, ARF(arffile)))
+ # grouping and quality
+ self.grouping = grouping
+ self.quality = quality
+
+ def apply_grouping(self, verbose=False):
+ self.spectrumset.apply_grouping(grouping=self.grouping,
+ quality=self.quality, verbose=verbose)
+ # also group the related surrounding spectra
+ for specset in self.cross_in_specset:
+ specset.apply_grouping(grouping=self.grouping,
+ quality=self.quality, verbose=verbose)
+ self.groupped = True
+
+ def estimate_errors(self, gehrels=True, verbose=False):
+ if verbose:
+ print("INFO: Estimating spectral errors ...")
+ self.spectrumset.estimate_errors(gehrels=gehrels)
+ # also estimate errors for the related surrounding spectra
+ for specset in self.cross_in_specset:
+ specset.estimate_errors(gehrels=gehrels)
+
+ def do_correction(self, subtract_bkg=True, fix_negative=False,
+ group_squeeze=True, verbose=False):
+ """
+ Perform the crosstalk correction. The background contribution
+ for each spectrum is subtracted first if `subtract_bkg' is True.
+ The basic correction procedures are recorded to the header.
+ """
+ self.spectrumset.header.add_history("Crosstalk Correction BEGIN")
+ self.spectrumset.header.add_history(" TOOL: %s (v%s) @ %s" % (\
+ os.path.basename(sys.argv[0]), __version__,
+ datetime.utcnow().isoformat()))
+ # background subtraction
+ if subtract_bkg:
+ if verbose:
+ print("INFO: subtract background ...", file=sys.stderr)
+ self.spectrumset.subtract_bkg(inplace=True, verbose=verbose)
+ # also apply background subtraction to the surrounding spectra
+ for specset in self.cross_in_specset:
+ specset.subtract_bkg(inplace=True, verbose=verbose)
+ # subtractions
+ if verbose:
+ print("INFO: apply subtractions ...", file=sys.stderr)
+ for specset, cross_arf in zip(self.cross_in_specset,
+ self.cross_in_arf):
+ self.spectrumset.subtract(spectrumset=specset,
+ cross_arf=cross_arf, groupped=self.groupped,
+ group_squeeze=group_squeeze, verbose=verbose)
+ # compensations
+ if verbose:
+ print("INFO: apply compensations ...", file=sys.stderr)
+ for cross_arf in self.cross_out_arf:
+ self.spectrumset.compensate(cross_arf=cross_arf,
+ groupped=self.groupped, group_squeeze=group_squeeze,
+ verbose=verbose)
+ # fix negative values in channels
+ if fix_negative:
+ if verbose:
+ print("INFO: fix negative channel values ...", file=sys.stderr)
+ self.spectrumset.fix_negative(verbose=verbose)
+ self.spectrumset.header.add_history("END Crosstalk Correction")
+ # reset header keywords
+ self.spectrumset.reset_header_keywords(
+ keywords=["ANCRFILE", "BACKFILE"])
+
+ def copy(self):
+ new = copy(self)
+ # properly handle the copy of spectrumsets
+ new.spectrumset = self.spectrumset.copy()
+ new.cross_in_specset = [ specset.copy() \
+ for specset in self.cross_in_specset ]
+ return new
+
+ def randomize(self):
+ self.spectrumset.randomize()
+ for specset in self.cross_in_specset:
+ specset.randomize()
+ return self
+
+ def get_spectrum(self, copy=True):
+ if copy:
+ return self.spectrumset.copy()
+ else:
+ return self.spectrumset
+
+ def write(self, filename=None, clobber=False):
+ self.spectrumset.write(filename=filename, clobber=clobber)
+# class Crosstalk }}}
+
+
+class Deprojection: # {{{
+ """
+ Perform the deprojection on a set of PROJECTED spectra with the
+ assumption of spherical symmetry of the source object, and produce
+ the DEPROJECTED spectra.
+
+ NOTE:
+ * Assumption of the spherical symmetry
+ * Background should be subtracted before deprojection
+ * ARF differences of different regions are taken into account
+
+ Reference & Credit:
+ [1] Direct X-ray Spectra Deprojection
+ https://www-xray.ast.cam.ac.uk/papers/dsdeproj/
+ Sanders & Fabian 2007, MNRAS, 381, 1381
+ """
+ spectra = None
+ grouping = None
+ quality = None
+
+ def __init__(self, spectra, grouping=None, quality=None, verbose=False):
+ """
+ Arguments:
+ * spectra: a set of spectra from the inner-most to the outer-most
+ regions (e.g., spectra after correcting crosstalk effect)
+ * grouping: grouping specification for all the spectra
+ * quality: quality column for the spectra
+ """
+ self.spectra = []
+ for spec in spectra:
+ if not isinstance(spec, SpectrumSet):
+ raise ValueError("Not a 'SpectrumSet' object")
+ spec.read_xflt()
+ self.spectra.append(spec)
+ self.spectra = spectra
+ self.grouping = grouping
+ self.quality = quality
+ # sort spectra by `radius_outer'
+ self.spectra.sort(key=lambda x: x.radius_outer)
+ # set the inner radii
+ radii_inner = [0.0] + [ x.radius_outer for x in self.spectra[:-1] ]
+ for spec, rin in zip(self.spectra, radii_inner):
+ spec.set_radius_inner(rin)
+ if verbose:
+ print("Deprojection: loaded spectrum: radius: (%s, %s)" % \
+ (spec.radius_inner, spec.radius_outer),
+ file=sys.stderr)
+ # check EXPOSURE validity (all spectra must have the same exposures)
+ exposures = [ spec.EXPOSURE for spec in self.spectra ]
+ assert np.allclose(exposures[:-1], exposures[1:])
+
+ def subtract_bkg(self, verbose=True):
+ for spec in self.spectra:
+ if not spec.bkg:
+ raise ValueError("Spectrum '%s' has NO background" % \
+ spec.filename)
+ spec.subtract_bkg(inplace=True, verbose=verbose)
+
+ def apply_grouping(self, verbose=False):
+ for spec in self.spectra:
+ spec.apply_grouping(grouping=self.grouping, quality=self.quality,
+ verbose=verbose)
+
+ def estimate_errors(self, gehrels=True):
+ for spec in self.spectra:
+ spec.estimate_errors(gehrels=gehrels)
+
+ def scale(self):
+ """
+ Scale the spectral data according to the region angular size.
+ """
+ for spec in self.spectra:
+ spec.scale()
+
+ def do_deprojection(self, group_squeeze=True, verbose=True):
+ #
+ # TODO/XXX: How to apply ARF correction here???
+ #
+ num_spec = len(self.spectra)
+ tmp_spec_data = self.spectra[0].get_data(group_squeeze=group_squeeze)
+ spec_shape = tmp_spec_data.shape
+ spec_dtype = tmp_spec_data.dtype
+ spec_per_vol = [None] * num_spec
+ #
+ for shellnum in reversed(range(num_spec)):
+ if verbose:
+ print("DEPROJECTION: deprojecting shell %d ..." % shellnum,
+ file=sys.stderr)
+ spec = self.spectra[shellnum]
+ # calculate projected spectrum of outlying shells
+ proj_spec = np.zeros(spec_shape, spec_dtype)
+ for outer in range(shellnum+1, num_spec):
+ vol = self.projected_volume(
+ r1=self.spectra[outer].radius_inner,
+ r2=self.spectra[outer].radius_outer,
+ R1=spec.radius_inner,
+ R2=spec.radius_outer)
+ proj_spec += spec_per_vol[outer] * vol
+ #
+ this_spec = spec.get_data(group_squeeze=group_squeeze, copy=True)
+ deproj_spec = this_spec - proj_spec
+ # calculate the volume that this spectrum is from
+ this_vol = self.projected_volume(
+ r1=spec.radius_inner, r2=spec.radius_outer,
+ R1=spec.radius_inner, R2=spec.radius_outer)
+ # calculate the spectral data per unit volume
+ spec_per_vol[shellnum] = deproj_spec / this_vol
+ # set the spectral data to these deprojected values
+ self.set_spec_data(spec_per_vol, group_squeeze=group_squeeze)
+ # add history to header
+ self.add_history()
+
+ def get_spec_data(self, group_squeeze=True, copy=True):
+ """
+ Extract the spectral data of each spectrum after deprojection
+ performed.
+ """
+ return [ spec.get_data(group_squeeze=group_squeeze, copy=copy)
+ for spec in self.spectra ]
+
+ def set_spec_data(self, spec_data, group_squeeze=True):
+ """
+ Set `spec_data' for each spectrum to the deprojected spectral data.
+ """
+ assert len(spec_data) == len(self.spectra)
+ for spec, data in zip(self.spectra, spec_data):
+ spec.set_data(data, group_squeeze=group_squeeze)
+
+ def add_history(self):
+ """
+ Append a brief history about this tool to the header.
+ """
+ history = "Deprojected by %s (v%s) @ %s" % (
+ os.path.basename(sys.argv[0]), __version__,
+ datetime.utcnow().isoformat())
+ for spec in self.spectra:
+ spec.header.add_history(history)
+
+ def add_stat_err(self, stat_err, group_squeeze=True):
+ """
+ Add the "STAT_ERR" column to each spectrum.
+ """
+ assert len(stat_err) == len(self.spectra)
+ for spec, err in zip(self.spectra, stat_err):
+ spec.add_stat_err(err, group_squeeze=group_squeeze)
+
+ def write(self, filenames=[], clobber=False):
+ """
+ Write the deprojected spectra to output file.
+ """
+ if filenames == []:
+ filenames = [ spec.outfile for spec in self.spectra ]
+ for spec, outfile in zip(self.spectra, filenames):
+ spec.write(filename=outfile, clobber=clobber)
+
+ @staticmethod
+ def projected_volume(r1, r2, R1, R2):
+ """
+ Calculate the projected volume of a spherical shell of radii r1 -> r2
+ onto an annulus on the sky of radius R1 -> R2.
+
+ This volume is the integral:
+ Int(R=R1,R2) Int(x=sqrt(r1^2-R^2),sqrt(r2^2-R^2)) 2*pi*R dx dR
+ =
+ Int(R=R1,R2) 2*pi*R * (sqrt(r2^2-R^2) - sqrt(r1^2-R^2)) dR
+
+ Note that the above integral is only half the total volume
+ (i.e., front only).
+ """
+ def sqrt_trunc(x):
+ if x > 0:
+ return np.sqrt(x)
+ else:
+ return 0.0
+ #
+ p1 = sqrt_trunc(r1**2 - R2**2)
+ p2 = sqrt_trunc(r1**2 - R1**2)
+ p3 = sqrt_trunc(r2**2 - R2**2)
+ p4 = sqrt_trunc(r2**2 - R1**2)
+ return 2.0 * (2.0/3.0) * np.pi * ((p1**3 - p2**3) + (p4**3 - p3**3))
+# class Deprojection }}}
+
+
+# Helper functions {{{
+def calc_median_errors(results):
+ """
+ Calculate the median and errors for the spectral data gathered
+ through Monte Carlo simulations.
+
+ TODO: investigate the errors calculation approach used here!
+ """
+ results = np.array(results)
+ # `results' now has shape: (mc_times, num_spec, num_channel)
+ # sort by the Monte Carlo simulation axis
+ results.sort(0)
+ mc_times = results.shape[0]
+ medians = results[ int(mc_times * 0.5) ]
+ lowerpcs = results[ int(mc_times * 0.1585) ]
+ upperpcs = results[ int(mc_times * 0.8415) ]
+ errors = np.sqrt(0.5 * ((medians-lowerpcs)**2 + (upperpcs-medians)**2))
+ return (medians, errors)
+
+
+def set_argument(name, default, cmdargs, config):
+ value = default
+ if name in config.keys():
+ value = config.as_bool(name)
+ value_cmd = vars(cmdargs)[name]
+ if value_cmd != default:
+ value = value_cmd # command arguments overwrite others
+ return value
+# helper functions }}}
+
+
+# main routine {{{
+def main(config, subtract_bkg, fix_negative, mc_times,
+ verbose=False, clobber=False):
+ # collect ARFs and RMFs into dictionaries (avoid interpolation every time)
+ arf_files = set()
+ rmf_files = set()
+ for region in config.sections:
+ config_reg = config[region]
+ arf_files.add(config_reg.get("arf"))
+ rmf_files.add(config_reg.get("rmf"))
+ for reg_in in config_reg["cross_in"].values():
+ arf_files.add(reg_in.get("arf"))
+ arf_files.add(reg_in.get("cross_arf"))
+ if "cross_out" in config_reg.sections:
+ for arf in config_reg["cross_out"].as_list("cross_arf"):
+ arf_files.add(arf)
+ arf_files = arf_files - set([None])
+ arf_dict = { arf: ARF(arf) for arf in arf_files }
+ rmf_files = rmf_files - set([None])
+ rmf_dict = { rmf: RMF(rmf) for rmf in rmf_files }
+ if verbose:
+ print("INFO: arf_files:", arf_files, file=sys.stderr)
+ print("INFO: rmf_files:", rmf_files, file=sys.stderr)
+
+ # get the GROUPING and QUALITY data
+ grouping_fits = fits.open(config["grouping"])
+ grouping = grouping_fits["SPECTRUM"].data.columns["GROUPING"].array
+ quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
+ # squeeze the groupped spectral data, etc.
+ group_squeeze = True
+
+ # crosstalk objects (BEFORE background subtraction)
+ crosstalks_cleancopy = []
+ # crosstalk-corrected spectra
+ cc_spectra = []
+
+ # correct crosstalk effects for each region first
+ for region in config.sections:
+ if verbose:
+ print("INFO: processing '%s' ..." % region, file=sys.stderr)
+ crosstalk = Crosstalk(config.get(region),
+ arf_dict=arf_dict, rmf_dict=rmf_dict,
+ grouping=grouping, quality=quality)
+ crosstalk.apply_grouping(verbose=verbose)
+ crosstalk.estimate_errors(verbose=verbose)
+ # keep a (almost) clean copy of the crosstalk object
+ crosstalks_cleancopy.append(crosstalk.copy())
+ if verbose:
+ print("INFO: doing crosstalk correction ...", file=sys.stderr)
+ crosstalk.do_correction(subtract_bkg=subtract_bkg,
+ fix_negative=fix_negative, group_squeeze=group_squeeze,
+ verbose=verbose)
+ cc_spectra.append(crosstalk.get_spectrum(copy=True))
+
+ # load back the crosstalk-corrected spectra for deprojection
+ if verbose:
+ print("INFO: preparing spectra for deprojection ...", file=sys.stderr)
+ deprojection = Deprojection(spectra=cc_spectra, grouping=grouping,
+ quality=quality, verbose=verbose)
+ if verbose:
+ print("INFO: scaling spectra according the region angular size...",
+ file=sys.stderr)
+ deprojection.scale()
+ if verbose:
+ print("INFO: doing deprojection ...", file=sys.stderr)
+ deprojection.do_deprojection(verbose=verbose)
+ deproj_results = [ deprojection.get_spec_data(
+ group_squeeze=group_squeeze, copy=True) ]
+
+ # Monte Carlo for spectral group error estimation
+ print("INFO: Monte Carlo to estimate spectral errors (%d times) ..." % \
+ mc_times, file=sys.stderr)
+ for i in range(mc_times):
+ if i % 100 == 0:
+ print("%d..." % i, end="", flush=True, file=sys.stderr)
+ # correct crosstalk effects
+ cc_spectra_copy = []
+ for crosstalk in crosstalks_cleancopy:
+ # copy and randomize
+ crosstalk_copy = crosstalk.copy().randomize()
+ crosstalk_copy.do_correction(subtract_bkg=subtract_bkg,
+ fix_negative=fix_negative, group_squeeze=group_squeeze,
+ verbose=False)
+ cc_spectra_copy.append(crosstalk_copy.get_spectrum(copy=True))
+ # deproject spectra
+ deprojection_copy = Deprojection(spectra=cc_spectra_copy,
+ grouping=grouping, quality=quality, verbose=False)
+ deprojection_copy.scale()
+ deprojection_copy.do_deprojection(verbose=False)
+ deproj_results.append(deprojection_copy.get_spec_data(
+ group_squeeze=group_squeeze, copy=True))
+ print("DONE!", flush=True, file=sys.stderr)
+
+ if verbose:
+ print("INFO: Calculating the median and errors for each spectrum ...",
+ file=sys.stderr)
+ medians, errors = calc_median_errors(deproj_results)
+ deprojection.set_spec_data(medians, group_squeeze=group_squeeze)
+ deprojection.add_stat_err(errors, group_squeeze=group_squeeze)
+ if verbose:
+ print("INFO: Writing the crosstalk-corrected and deprojected " + \
+ "spectra with estimated statistical errors ...",
+ file=sys.stderr)
+ deprojection.write(clobber=clobber)
+# main routine }}}
+
+
+# main_deprojection routine {{{
+def main_deprojection(config, mc_times, verbose=False, clobber=False):
+ """
+ Only perform the spectral deprojection.
+ """
+ # collect ARFs and RMFs into dictionaries (avoid interpolation every time)
+ arf_files = set()
+ rmf_files = set()
+ for region in config.sections:
+ config_reg = config[region]
+ arf_files.add(config_reg.get("arf"))
+ rmf_files.add(config_reg.get("rmf"))
+ arf_files = arf_files - set([None])
+ arf_dict = { arf: ARF(arf) for arf in arf_files }
+ rmf_files = rmf_files - set([None])
+ rmf_dict = { rmf: RMF(rmf) for rmf in rmf_files }
+ if verbose:
+ print("INFO: arf_files:", arf_files, file=sys.stderr)
+ print("INFO: rmf_files:", rmf_files, file=sys.stderr)
+
+ # get the GROUPING and QUALITY data
+ grouping_fits = fits.open(config["grouping"])
+ grouping = grouping_fits["SPECTRUM"].data.columns["GROUPING"].array
+ quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
+ # squeeze the groupped spectral data, etc.
+ group_squeeze = True
+
+ # load spectra for deprojection
+ if verbose:
+ print("INFO: preparing spectra for deprojection ...", file=sys.stderr)
+ proj_spectra = []
+ for region in config.sections:
+ config_reg = config[region]
+ specset = SpectrumSet(filename=config_reg["spec"],
+ outfile=config_reg["outfile"],
+ arf=arf_dict.get(config_reg["arf"], config_reg["arf"]),
+ rmf=rmf_dict.get(config_reg["rmf"], config_reg["rmf"]),
+ bkg=config_reg["bkg"])
+ proj_spectra.append(specset)
+
+ deprojection = Deprojection(spectra=proj_spectra, grouping=grouping,
+ quality=quality, verbose=verbose)
+ deprojection.apply_grouping(verbose=verbose)
+ deprojection.estimate_errors()
+ if verbose:
+ print("INFO: scaling spectra according the region angular size ...",
+ file=sys.stderr)
+ deprojection.scale()
+
+ # keep a (almost) clean copy of the input projected spectra
+ proj_spectra_cleancopy = [ spec.copy() for spec in proj_spectra ]
+
+ if verbose:
+ print("INFO: subtract the background ...", file=sys.stderr)
+ deprojection.subtract_bkg(verbose=verbose)
+ if verbose:
+ print("INFO: doing deprojection ...", file=sys.stderr)
+ deprojection.do_deprojection(verbose=verbose)
+ deproj_results = [ deprojection.get_spec_data(
+ group_squeeze=group_squeeze, copy=True) ]
+
+ # Monte Carlo for spectral group error estimation
+ print("INFO: Monte Carlo to estimate spectral errors (%d times) ..." % \
+ mc_times, file=sys.stderr)
+ for i in range(mc_times):
+ if i % 100 == 0:
+ print("%d..." % i, end="", flush=True, file=sys.stderr)
+ # copy and randomize the input projected spectra
+ proj_spectra_copy = [ spec.copy().randomize()
+ for spec in proj_spectra_cleancopy ]
+ # deproject spectra
+ deprojection_copy = Deprojection(spectra=proj_spectra_copy,
+ grouping=grouping, quality=quality, verbose=False)
+ deprojection_copy.subtract_bkg(verbose=False)
+ deprojection_copy.do_deprojection(verbose=False)
+ deproj_results.append(deprojection_copy.get_spec_data(
+ group_squeeze=group_squeeze, copy=True))
+ print("DONE!", flush=True, file=sys.stderr)
+
+ if verbose:
+ print("INFO: Calculating the median and errors for each spectrum ...",
+ file=sys.stderr)
+ medians, errors = calc_median_errors(deproj_results)
+ deprojection.set_spec_data(medians, group_squeeze=group_squeeze)
+ deprojection.add_stat_err(errors, group_squeeze=group_squeeze)
+ if verbose:
+ print("INFO: Writing the deprojected spectra " + \
+ "with estimated statistical errors ...",
+ file=sys.stderr)
+ deprojection.write(clobber=clobber)
+# main_deprojection routine }}}
+
+
+# main_crosstalk routine {{{
+def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
+ verbose=False, clobber=False):
+ """
+ Only perform the crosstalk correction.
+ """
+ # collect ARFs and RMFs into dictionaries (avoid interpolation every time)
+ arf_files = set()
+ rmf_files = set()
+ for region in config.sections:
+ config_reg = config[region]
+ arf_files.add(config_reg.get("arf"))
+ rmf_files.add(config_reg.get("rmf"))
+ for reg_in in config_reg["cross_in"].values():
+ arf_files.add(reg_in.get("arf"))
+ arf_files.add(reg_in.get("cross_arf"))
+ if "cross_out" in config_reg.sections:
+ for arf in config_reg["cross_out"].as_list("cross_arf"):
+ arf_files.add(arf)
+ arf_files = arf_files - set([None])
+ arf_dict = { arf: ARF(arf) for arf in arf_files }
+ rmf_files = rmf_files - set([None])
+ rmf_dict = { rmf: RMF(rmf) for rmf in rmf_files }
+ if verbose:
+ print("INFO: arf_files:", arf_files, file=sys.stderr)
+ print("INFO: rmf_files:", rmf_files, file=sys.stderr)
+
+ # get the GROUPING and QUALITY data
+ if "grouping" in config.keys():
+ grouping_fits = fits.open(config["grouping"])
+ grouping = grouping_fits["SPECTRUM"].data.columns["GROUPING"].array
+ quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
+ group_squeeze = True
+ else:
+ grouping = None
+ quality = None
+ group_squeeze = False
+
+ # crosstalk objects (BEFORE background subtraction)
+ crosstalks_cleancopy = []
+ # crosstalk-corrected spectra
+ cc_spectra = []
+
+ # correct crosstalk effects for each region first
+ for region in config.sections:
+ if verbose:
+ print("INFO: processing '%s' ..." % region, file=sys.stderr)
+ crosstalk = Crosstalk(config.get(region),
+ arf_dict=arf_dict, rmf_dict=rmf_dict,
+ grouping=grouping, quality=quality)
+ if grouping is not None:
+ crosstalk.apply_grouping(verbose=verbose)
+ crosstalk.estimate_errors(verbose=verbose)
+ # keep a (almost) clean copy of the crosstalk object
+ crosstalks_cleancopy.append(crosstalk.copy())
+ if verbose:
+ print("INFO: doing crosstalk correction ...", file=sys.stderr)
+ crosstalk.do_correction(subtract_bkg=subtract_bkg,
+ fix_negative=fix_negative, group_squeeze=group_squeeze,
+ verbose=verbose)
+ cc_spectra.append(crosstalk.get_spectrum(copy=True))
+
+ # spectral data of the crosstalk-corrected spectra
+ cc_results = []
+ cc_results.append([ spec.get_data(group_squeeze=group_squeeze, copy=True)
+ for spec in cc_spectra ])
+
+ # Monte Carlo for spectral group error estimation
+ print("INFO: Monte Carlo to estimate spectral errors (%d times) ..." % \
+ mc_times, file=sys.stderr)
+ for i in range(mc_times):
+ if i % 100 == 0:
+ print("%d..." % i, end="", flush=True, file=sys.stderr)
+ # correct crosstalk effects
+ cc_spectra_copy = []
+ for crosstalk in crosstalks_cleancopy:
+ # copy and randomize
+ crosstalk_copy = crosstalk.copy().randomize()
+ crosstalk_copy.do_correction(subtract_bkg=subtract_bkg,
+ fix_negative=fix_negative, group_squeeze=group_squeeze,
+ verbose=False)
+ cc_spectra_copy.append(crosstalk_copy.get_spectrum(copy=True))
+ cc_results.append([ spec.get_data(group_squeeze=group_squeeze,
+ copy=True)
+ for spec in cc_spectra_copy ])
+ print("DONE!", flush=True, file=sys.stderr)
+
+ if verbose:
+ print("INFO: Calculating the median and errors for each spectrum ...",
+ file=sys.stderr)
+ medians, errors = calc_median_errors(cc_results)
+ if verbose:
+ print("INFO: Writing the crosstalk-corrected spectra " + \
+ "with estimated statistical errors ...",
+ file=sys.stderr)
+ for spec, data, err in zip(cc_spectra, medians, errors):
+ spec.set_data(data, group_squeeze=group_squeeze)
+ spec.add_stat_err(err, group_squeeze=group_squeeze)
+ spec.write(clobber=clobber)
+# main_crosstalk routine }}}
+
+
+if __name__ == "__main__":
+ # arguments' default values
+ default_mode = "both"
+ default_mc_times = 5000
+ # commandline arguments parser
+ parser = argparse.ArgumentParser(
+ description="Correct the crosstalk effects for XMM EPIC spectra",
+ epilog="Version: %s (%s)" % (__version__, __date__))
+ parser.add_argument("config", help="config file in which describes " +\
+ "the crosstalk relations ('ConfigObj' syntax)")
+ parser.add_argument("-m", "--mode", dest="mode", default=default_mode,
+ help="operation mode (both | crosstalk | deprojection)")
+ parser.add_argument("-B", "--no-subtract-bkg", dest="subtract_bkg",
+ action="store_false", help="do NOT subtract background first")
+ parser.add_argument("-N", "--fix-negative", dest="fix_negative",
+ action="store_true", help="fix negative channel values")
+ parser.add_argument("-M", "--mc-times", dest="mc_times",
+ type=int, default=default_mc_times,
+ help="Monte Carlo times for error estimation")
+ parser.add_argument("-C", "--clobber", dest="clobber",
+ action="store_true", help="overwrite output file if exists")
+ parser.add_argument("-v", "--verbose", dest="verbose",
+ action="store_true", help="show verbose information")
+ args = parser.parse_args()
+ # merge commandline arguments and config
+ config = ConfigObj(args.config)
+ subtract_bkg = set_argument("subtract_bkg", True, args, config)
+ fix_negative = set_argument("fix_negative", False, args, config)
+ verbose = set_argument("verbose", False, args, config)
+ clobber = set_argument("clobber", False, args, config)
+ # operation mode
+ mode = config.get("mode", default_mode)
+ if args.mode != default_mode:
+ mode = args.mode
+ # Monte Carlo times
+ mc_times = config.as_int("mc_times")
+ if args.mc_times != default_mc_times:
+ mc_times = args.mc_times
+
+ if mode.lower() == "both":
+ print("MODE: CROSSTALK + DEPROJECTION", file=sys.stderr)
+ main(config, subtract_bkg=subtract_bkg, fix_negative=fix_negative,
+ mc_times=mc_times, verbose=verbose, clobber=clobber)
+ elif mode.lower() == "deprojection":
+ print("MODE: DEPROJECTION", file=sys.stderr)
+ main_deprojection(config, mc_times=mc_times,
+ verbose=verbose, clobber=clobber)
+ elif mode.lower() == "crosstalk":
+ print("MODE: CROSSTALK", file=sys.stderr)
+ main_crosstalk(config, subtract_bkg=subtract_bkg,
+ fix_negative=fix_negative, mc_times=mc_times,
+ verbose=verbose, clobber=clobber)
+ else:
+ raise ValueError("Invalid operation mode: %s" % mode)
+ print(WARNING)
+
+# vim: set ts=4 sw=4 tw=0 fenc=utf-8 ft=python: #