aboutsummaryrefslogtreecommitdiffstats
path: root/astro/spectrum/crosstalk_deprojection.py
diff options
context:
space:
mode:
Diffstat (limited to 'astro/spectrum/crosstalk_deprojection.py')
-rwxr-xr-xastro/spectrum/crosstalk_deprojection.py498
1 files changed, 256 insertions, 242 deletions
diff --git a/astro/spectrum/crosstalk_deprojection.py b/astro/spectrum/crosstalk_deprojection.py
index 617b8ff..0c04d77 100755
--- a/astro/spectrum/crosstalk_deprojection.py
+++ b/astro/spectrum/crosstalk_deprojection.py
@@ -207,7 +207,6 @@ class ARF: # {{{
https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html
"""
filename = None
- fitsobj = None
# only consider the "SPECTRUM" extension
header = None
energ_lo = None
@@ -225,12 +224,12 @@ class ARF: # {{{
def __init__(self, filename):
self.filename = filename
- self.fitsobj = fits.open(filename)
- ext_specresp = self.fitsobj["SPECRESP"]
- self.header = ext_specresp.header
- self.energ_lo = ext_specresp.data["ENERG_LO"]
- self.energ_hi = ext_specresp.data["ENERG_HI"]
- self.specresp = ext_specresp.data["SPECRESP"]
+ with fits.open(filename) as fitsobj:
+ ext_specresp = fitsobj["SPECRESP"]
+ self.header = ext_specresp.header
+ self.energ_lo = ext_specresp.data["ENERG_LO"]
+ self.energ_hi = ext_specresp.data["ENERG_HI"]
+ self.specresp = ext_specresp.data["SPECRESP"]
def get_data(self, groupped=False, group_squeeze=False, copy=True):
if groupped:
@@ -277,13 +276,15 @@ class ARF: # {{{
"""
if not hasattr(self, "f_interp") or self.f_interp is None:
energy = self.get_energy()
- arf = self.get_data(copy=False)
+ arf = self.get_data(copy=False)
if verbose:
print("INFO: interpolating ARF '%s' (may take a while) ..." %
self.filename, file=sys.stderr)
- f_interp = scipy.interpolate.interp1d(energy, arf, kind="cubic",
- bounds_error=False, fill_value=0.0, assume_sorted=True)
+ f_interp = scipy.interpolate.interp1d(
+ energy, arf, kind="cubic", bounds_error=False,
+ fill_value=0.0, assume_sorted=True)
self.f_interp = f_interp
+
if x is not None:
return self.f_interp(x)
else:
@@ -304,7 +305,7 @@ class ARF: # {{{
return
if verbose:
print("INFO: Grouping spectrum '%s' ..." % self.filename,
- file=sys.stderr)
+ file=sys.stderr)
self.energy_channel = energy_channel
self.grouping = grouping
# interpolate the ARF w.r.t the spectral channel energies
@@ -349,7 +350,6 @@ class RMF: # {{{
https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html
"""
filename = None
- fitsobj = None
## extension "MATRIX"
hdr_matrix = None
energ_lo = None
@@ -369,23 +369,23 @@ class RMF: # {{{
rmfimg = None
def __init__(self, filename):
- self.filename = filename
- self.fitsobj = fits.open(filename)
- ## "MATRIX" extension
- ext_matrix = self.fitsobj["MATRIX"]
- self.hdr_matrix = ext_matrix.header
- self.energ_lo = ext_matrix.data["ENERG_LO"]
- self.energ_hi = ext_matrix.data["ENERG_HI"]
- self.n_grp = ext_matrix.data["N_GRP"]
- self.f_chan = ext_matrix.data["F_CHAN"]
- self.n_chan = ext_matrix.data["N_CHAN"]
- self.matrix = ext_matrix.data["MATRIX"]
- ## "EBOUNDS" extension
- ext_ebounds = self.fitsobj["EBOUNDS"]
- self.hdr_ebounds = ext_ebounds.header
- self.channel = ext_ebounds.data["CHANNEL"]
- self.e_min = ext_ebounds.data["E_MIN"]
- self.e_max = ext_ebounds.data["E_MAX"]
+ self.filename = filename
+ with fits.open(filename) as fitsobj:
+ # "MATRIX" extension
+ ext_matrix = fitsobj["MATRIX"]
+ self.hdr_matrix = ext_matrix.header
+ self.energ_lo = ext_matrix.data["ENERG_LO"]
+ self.energ_hi = ext_matrix.data["ENERG_HI"]
+ self.n_grp = ext_matrix.data["N_GRP"]
+ self.f_chan = ext_matrix.data["F_CHAN"]
+ self.n_chan = ext_matrix.data["N_CHAN"]
+ self.matrix = ext_matrix.data["MATRIX"]
+ # "EBOUNDS" extension
+ ext_ebounds = fitsobj["EBOUNDS"]
+ self.hdr_ebounds = ext_ebounds.header
+ self.channel = ext_ebounds.data["CHANNEL"]
+ self.e_min = ext_ebounds.data["E_MIN"]
+ self.e_max = ext_ebounds.data["E_MAX"]
def get_energy(self, mean="geometric"):
"""
@@ -428,7 +428,9 @@ class RMF: # {{{
rmfimg = np.zeros(shape=(n_energy, n_channel), dtype=rmf_dtype)
for i in np.arange(n_energy)[self.n_grp > 0]:
rmfimg[i, :] = _make_rmfimg_row(n_channel, rmf_dtype,
- self.f_chan[i], self.n_chan[i], self.matrix[i])
+ self.f_chan[i],
+ self.n_chan[i],
+ self.matrix[i])
self.rmfimg = rmfimg
return self.rmfimg
@@ -446,13 +448,11 @@ class Spectrum: # {{{
"""
Class that deals with the X-ray spectrum file (usually *.pi).
"""
- filename = None
- # FITS object return by `fits.open()'
- fitsobj = None
+ filename = None
# header of "SPECTRUM" extension
- header = None
+ header = None
# "SPECTRUM" extension data
- channel = None
+ channel = None
# name of the spectrum data column (i.e., type, "COUNTS" or "RATE")
spec_type = None
# unit of the spectrum data ("count" for "COUNTS", "count/s" for "RATE")
@@ -460,50 +460,49 @@ class Spectrum: # {{{
# spectrum data
spec_data = None
# estimated spectral errors for each channel/group
- spec_err = None
+ spec_err = None
# statistical errors for each channel/group
- stat_err = None
+ stat_err = None
# grouping and quality
- grouping = None
- quality = None
+ grouping = None
+ quality = None
# whether the spectral data being groupped
- groupped = False
+ groupped = False
# several important keywords
- EXPOSURE = None
- BACKSCAL = None
- AREASCAL = None
- RESPFILE = None
- ANCRFILE = None
- BACKFILE = None
+ EXPOSURE = None
+ BACKSCAL = None
+ AREASCAL = None
+ RESPFILE = None
+ ANCRFILE = None
+ BACKFILE = None
# numpy dtype and FITS format code of the spectrum data
- spec_dtype = None
+ spec_dtype = None
spec_fits_format = None
- # output filename for writing the spectrum if no filename provided
- outfile = None
- def __init__(self, filename, outfile=None):
+ def __init__(self, filename):
self.filename = filename
- self.fitsobj = fits.open(filename)
- ext_spec = self.fitsobj["SPECTRUM"]
- self.header = ext_spec.header.copy(strip=True)
- colnames = ext_spec.columns.names
- if "COUNTS" in colnames:
- self.spec_type = "COUNTS"
- elif "RATE" in colnames:
- self.spec_type = "RATE"
- else:
- raise ValueError("Invalid spectrum file")
- self.channel = ext_spec.data.columns["CHANNEL"].array
- col_spec_data = ext_spec.data.columns[self.spec_type]
- self.spec_data = col_spec_data.array.copy()
- self.spec_unit = col_spec_data.unit
- self.spec_dtype = col_spec_data.dtype
- self.spec_fits_format = col_spec_data.format
- # grouping and quality
- if "GROUPING" in colnames:
- self.grouping = ext_spec.data.columns["GROUPING"].array
- if "QUALITY" in colnames:
- self.quality = ext_spec.data.columns["QUALITY"].array
+ with fits.open(filename) as fitsobj:
+ ext_spec = fitsobj["SPECTRUM"]
+ self.header = ext_spec.header.copy(strip=True)
+ colnames = ext_spec.columns.names
+ if "COUNTS" in colnames:
+ self.spec_type = "COUNTS"
+ elif "RATE" in colnames:
+ self.spec_type = "RATE"
+ else:
+ raise ValueError("Invalid spectrum file")
+ self.channel = ext_spec.data.columns["CHANNEL"].array
+ col_spec_data = ext_spec.data.columns[self.spec_type]
+ self.spec_data = col_spec_data.array.copy()
+ self.spec_unit = col_spec_data.unit
+ self.spec_dtype = col_spec_data.dtype
+ self.spec_fits_format = col_spec_data.format
+ # grouping and quality
+ if "GROUPING" in colnames:
+ self.grouping = ext_spec.data.columns["GROUPING"].array
+ if "QUALITY" in colnames:
+ self.quality = ext_spec.data.columns["QUALITY"].array
+
# keywords
self.EXPOSURE = self.header.get("EXPOSURE")
self.BACKSCAL = self.header.get("BACKSCAL")
@@ -511,8 +510,6 @@ class Spectrum: # {{{
self.RESPFILE = self.header.get("RESPFILE")
self.ANCRFILE = self.header.get("ANCRFILE")
self.BACKFILE = self.header.get("BACKFILE")
- # output filename
- self.outfile = outfile
def get_data(self, group_squeeze=False, copy=True):
"""
@@ -558,7 +555,7 @@ class Spectrum: # {{{
group, which are estimated by utilizing the Monte Carlo techniques.
"""
self.stat_err = np.zeros(self.spec_data.shape,
- dtype=self.spec_data.dtype)
+ dtype=self.spec_data.dtype)
if group_squeeze and self.groupped:
assert sum(self.grouping == 1) == len(stat_err)
self.stat_err[self.grouping == 1] = stat_err
@@ -627,13 +624,13 @@ class Spectrum: # {{{
if self.groupped:
idx = self.grouping == 1
self.spec_data[idx] = np.random.normal(self.spec_data[idx],
- self.spec_err[idx])
+ self.spec_err[idx])
else:
self.spec_data = np.random.normal(self.spec_data, self.spec_err)
return self
def fix_header_keywords(self,
- reset_kw=["ANCRFILE", "RESPFILE", "BACKFILE"]):
+ reset_kw=["ANCRFILE", "RESPFILE", "BACKFILE"]):
"""
Reset the keywords to "NONE" to avoid confusion or mistakes,
and also add mandatory spectral keywords if missing.
@@ -644,36 +641,37 @@ class Spectrum: # {{{
https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/summary/ogip_92_007_summary.html
"""
default_keywords = {
- ## Mandatory keywords
- #"EXTNAME" : "SPECTRUM",
- "TELESCOP" : "NONE",
- "INSTRUME" : "NONE",
- "FILTER" : "NONE",
- #"EXPOSURE" : <integration_time (s)>,
- "BACKFILE" : "NONE",
- "CORRFILE" : "NONE",
- "CORRSCAL" : 1.0,
- "RESPFILE" : "NONE",
- "ANCRFILE" : "NONE",
- "HDUCLASS" : "OGIP",
- "HDUCLAS1" : "SPECTRUM",
- "HDUVERS" : "1.2.1",
- "POISSERR" : True,
- #"CHANTYPE" : "PI",
- #"DETCHANS" : <total_number_of_detector_channels>,
- ## Optional keywords for further information
- "BACKSCAL" : 1.0,
- "AREASCAL" : 1.0,
- # Type of spectral data:
- # (1) "TOTAL": gross spectrum (source+bkg);
- # (2) "NET": background-subtracted spectrum
- # (3) "BKG" background spectrum
- #"HDUCLAS2" : "NET",
- # Details of the type of data:
- # (1) "COUNT": data stored as counts
- # (2) "RATE": data stored as counts/s
- "HDUCLAS3" : { "COUNTS":"COUNT",
- "RATE":"RATE" }.get(self.spec_type),
+ # Mandatory keywords
+ # "EXTNAME": "SPECTRUM",
+ "TELESCOP": "NONE",
+ "INSTRUME": "NONE",
+ "FILTER": "NONE",
+ # "EXPOSURE" : <integration_time (s)>,
+ "BACKFILE": "NONE",
+ "CORRFILE": "NONE",
+ "CORRSCAL": 1.0,
+ "RESPFILE": "NONE",
+ "ANCRFILE": "NONE",
+ "HDUCLASS": "OGIP",
+ "HDUCLAS1": "SPECTRUM",
+ "HDUVERS": "1.2.1",
+ "POISSERR": True,
+ # "CHANTYPE": "PI",
+ # "DETCHANS": <total_number_of_detector_channels>,
+
+ # Optional keywords for further information
+ "BACKSCAL": 1.0,
+ "AREASCAL": 1.0,
+ # Type of spectral data:
+ # (1) "TOTAL": gross spectrum (source+bkg);
+ # (2) "NET": background-subtracted spectrum
+ # (3) "BKG" background spectrum
+ # "HDUCLAS2": "NET",
+ # Details of the type of data:
+ # (1) "COUNT": data stored as counts
+ # (2) "RATE": data stored as counts/s
+ "HDUCLAS3": {"COUNTS": "COUNT",
+ "RATE": "RATE"}.get(self.spec_type),
}
# add mandatory keywords if missing
for kw, value in default_keywords.items():
@@ -683,13 +681,13 @@ class Spectrum: # {{{
for kw in reset_kw:
self.header[kw] = default_keywords.get(kw)
- def write(self, filename=None, clobber=False):
+ def write(self, outfile, clobber=False):
"""
Create a new "SPECTRUM" table/extension and replace the original
one, then write to output file.
"""
- if filename is None:
- filename = self.outfile
+ # Open the original input spectrum as the base
+ fitsobj = fits.open(self.filename)
columns = [
fits.Column(name="CHANNEL", format="I", array=self.channel),
fits.Column(name=self.spec_type, format=self.spec_fits_format,
@@ -698,7 +696,7 @@ class Spectrum: # {{{
if self.grouping is not None:
columns.append(fits.Column(name="GROUPING",
format="I", array=self.grouping))
- if self.quality is not None:
+ if self.quality is not None:
columns.append(fits.Column(name="QUALITY",
format="I", array=self.quality))
if self.stat_err is not None:
@@ -708,8 +706,9 @@ class Spectrum: # {{{
ext_spec_cols = fits.ColDefs(columns)
ext_spec = fits.BinTableHDU.from_columns(ext_spec_cols,
header=self.header)
- self.fitsobj["SPECTRUM"] = ext_spec
- self.fitsobj.writeto(filename, clobber=clobber, checksum=True)
+ # Replace the original spectrum data
+ fitsobj["SPECTRUM"] = ext_spec
+ fitsobj.writeto(outfile, clobber=clobber, checksum=True)
# class Spectrum }}}
@@ -738,17 +737,16 @@ class SpectrumSet(Spectrum): # {{{
# numpy dtype and FITS format code to which the spectrum data be
# converted if the data is "COUNTS"
- #_spec_dtype = np.float32
- #_spec_fits_format = "E"
- _spec_dtype = np.float64
+ _spec_dtype = np.float64
_spec_fits_format = "D"
def __init__(self, filename, outfile=None, arf=None, rmf=None, bkg=None):
- super().__init__(filename, outfile)
+ super().__init__(filename=filename)
+ self.outfile = outfile
# convert spectrum data type if necessary
if self.spec_data.dtype != self._spec_dtype:
- self.spec_data = self.spec_data.astype(self._spec_dtype)
- self.spec_dtype = self._spec_dtype
+ self.spec_data = self.spec_data.astype(self._spec_dtype)
+ self.spec_dtype = self._spec_dtype
self.spec_fits_format = self._spec_fits_format
if arf is not None:
if isinstance(arf, ARF):
@@ -771,6 +769,11 @@ class SpectrumSet(Spectrum): # {{{
self.bkg.spec_dtype = self._spec_dtype
self.bkg.spec_fits_format = self._spec_fits_format
+ def write(self, outfile=None, clobber=False):
+ if outfile is None:
+ outfile = self.outfile
+ super().write(outfile=outfile, clobber=clobber)
+
def get_energy(self, mean="geometric"):
"""
Get the energy values of each channel if RMF present.
@@ -824,7 +827,7 @@ class SpectrumSet(Spectrum): # {{{
while True:
try:
angle_begin = float(self.header["XFLT%04d" % num])
- angle_end = float(self.header["XFLT%04d" % (num+1)])
+ angle_end = float(self.header["XFLT%04d" % (num+1)])
num += 2
except KeyError:
break
@@ -936,7 +939,7 @@ class SpectrumSet(Spectrum): # {{{
return spec_data_subbkg
def subtract(self, spectrumset, cross_arf, groupped=False,
- group_squeeze=False, add_history=False, verbose=False):
+ group_squeeze=False, add_history=False, verbose=False):
"""
Subtract the photons that originate from the surrounding regions
but were scattered into this spectrum due to the finite PSF.
@@ -952,30 +955,32 @@ class SpectrumSet(Spectrum): # {{{
spec1_new = spec1 - spec2 * (cross_arf_2_to_1 / arf2)
2. The ARF are interpolated to match the energies of spetral channels.
"""
- operation = " SUBTRACT: %s - (%s/%s) * %s" % (self.filename,
- cross_arf.filename, spectrumset.arf.filename,
- spectrumset.filename)
+ operation = " SUBTRACT: %s - (%s/%s) * %s" % (
+ self.filename, cross_arf.filename, spectrumset.arf.filename,
+ spectrumset.filename)
if verbose:
print(operation, file=sys.stderr)
energy = self.get_energy()
if groupped:
spectrumset.arf.apply_grouping(energy_channel=energy,
- grouping=self.grouping, verbose=verbose)
+ grouping=self.grouping,
+ verbose=verbose)
cross_arf.apply_grouping(energy_channel=energy,
- grouping=self.grouping, verbose=verbose)
- arfresp_spec = spectrumset.arf.get_data(groupped=True,
- group_squeeze=group_squeeze)
- arfresp_cross = cross_arf.get_data(groupped=True,
- group_squeeze=group_squeeze)
+ grouping=self.grouping,
+ verbose=verbose)
+ arfresp_spec = spectrumset.arf.get_data(
+ groupped=True, group_squeeze=group_squeeze)
+ arfresp_cross = cross_arf.get_data(
+ groupped=True, group_squeeze=group_squeeze)
else:
- arfresp_spec = spectrumset.arf.interpolate(x=energy,
- verbose=verbose)
+ arfresp_spec = spectrumset.arf.interpolate(
+ x=energy, verbose=verbose)
arfresp_cross = cross_arf.interpolate(x=energy, verbose=verbose)
with np.errstate(divide="ignore", invalid="ignore"):
arf_ratio = arfresp_cross / arfresp_spec
# fix nan/inf values due to division by zero
- arf_ratio[ ~ np.isfinite(arf_ratio) ] = 0.0
- spec_data = (self.get_data(group_squeeze=group_squeeze) -
+ arf_ratio[~np.isfinite(arf_ratio)] = 0.0
+ spec_data = (self.get_data(group_squeeze=group_squeeze) -
spectrumset.get_data(group_squeeze=group_squeeze)*arf_ratio)
self.set_data(spec_data, group_squeeze=group_squeeze)
# record history
@@ -983,7 +988,7 @@ class SpectrumSet(Spectrum): # {{{
self.header.add_history(operation)
def compensate(self, cross_arf, groupped=False, group_squeeze=False,
- add_history=False, verbose=False):
+ add_history=False, verbose=False):
"""
Compensate the photons that originate from this regions but were
scattered into the surrounding regions due to the finite PSF.
@@ -991,25 +996,27 @@ class SpectrumSet(Spectrum): # {{{
formula:
spec1_new = spec1 + spec1 * (cross_arf_1_to_2 / arf1)
"""
- operation = " COMPENSATE: %s + (%s/%s) * %s" % (self.filename,
- cross_arf.filename, self.arf.filename, self.filename)
+ operation = " COMPENSATE: %s + (%s/%s) * %s" % (
+ self.filename, cross_arf.filename, self.arf.filename,
+ self.filename)
if verbose:
print(operation, file=sys.stderr)
energy = self.get_energy()
if groupped:
cross_arf.apply_grouping(energy_channel=energy,
- grouping=self.grouping, verbose=verbose)
- arfresp_this = self.arf.get_data(groupped=True,
- group_squeeze=group_squeeze)
- arfresp_cross = cross_arf.get_data(groupped=True,
- group_squeeze=group_squeeze)
+ grouping=self.grouping,
+ verbose=verbose)
+ arfresp_this = self.arf.get_data(
+ groupped=True, group_squeeze=group_squeeze)
+ arfresp_cross = cross_arf.get_data(
+ groupped=True, group_squeeze=group_squeeze)
else:
- arfresp_this = self.arf.interpolate(x=energy, verbose=verbose)
+ arfresp_this = self.arf.interpolate(x=energy, verbose=verbose)
arfresp_cross = cross_arf.interpolate(x=energy, verbose=verbose)
with np.errstate(divide="ignore", invalid="ignore"):
arf_ratio = arfresp_cross / arfresp_this
# fix nan/inf values due to division by zero
- arf_ratio[ ~ np.isfinite(arf_ratio) ] = 0.0
+ arf_ratio[~np.isfinite(arf_ratio)] = 0.0
spec_data = (self.get_data(group_squeeze=group_squeeze) +
self.get_data(group_squeeze=group_squeeze) * arf_ratio)
self.set_data(spec_data, group_squeeze=group_squeeze)
@@ -1034,7 +1041,7 @@ class SpectrumSet(Spectrum): # {{{
if verbose:
if i == 1:
print("*** Fixing negative channels: iter %d..." % i,
- end="", file=sys.stderr)
+ end="", file=sys.stderr)
else:
print("%d..." % i, end="", file=sys.stderr)
for ch in neg_channels:
@@ -1095,8 +1102,6 @@ class Crosstalk: # {{{
"""
# `SpectrumSet' object for the spectrum to be corrected
spectrumset = None
- # NOTE/XXX: do NOT use list (e.g., []) here, otherwise, all the
- # instances will share these list properties.
# `SpectrumSet' and `ARF' objects corresponding to the spectra from
# which the photons were scattered into this spectrum.
cross_in_specset = None
@@ -1111,7 +1116,7 @@ class Crosstalk: # {{{
groupped = False
def __init__(self, config, arf_dict={}, rmf_dict={},
- grouping=None, quality=None):
+ grouping=None, quality=None):
"""
Arguments:
* config: a section of the whole config file (`ConfigObj' object)
@@ -1120,17 +1125,19 @@ class Crosstalk: # {{{
self.cross_in_arf = []
self.cross_out_arf = []
# this spectrum to be corrected
- self.spectrumset = SpectrumSet(filename=config["spec"],
- outfile=config["outfile"],
- arf=arf_dict.get(config["arf"], config["arf"]),
- rmf=rmf_dict.get(config.get("rmf"), config.get("rmf")),
- bkg=config.get("bkg"))
+ self.spectrumset = SpectrumSet(
+ filename=config["spec"],
+ outfile=config["outfile"],
+ arf=arf_dict.get(config["arf"], config["arf"]),
+ rmf=rmf_dict.get(config.get("rmf"), config.get("rmf")),
+ bkg=config.get("bkg"))
# spectra and cross arf from which photons were scattered in
for reg_in in config["cross_in"].values():
- specset = SpectrumSet(filename=reg_in["spec"],
- arf=arf_dict.get(reg_in["arf"], reg_in["arf"]),
- rmf=rmf_dict.get(reg_in.get("rmf"), reg_in.get("rmf")),
- bkg=reg_in.get("bkg"))
+ specset = SpectrumSet(
+ filename=reg_in["spec"],
+ arf=arf_dict.get(reg_in["arf"], reg_in["arf"]),
+ rmf=rmf_dict.get(reg_in.get("rmf"), reg_in.get("rmf")),
+ bkg=reg_in.get("bkg"))
self.cross_in_specset.append(specset)
self.cross_in_arf.append(arf_dict.get(reg_in["cross_arf"],
ARF(reg_in["cross_arf"])))
@@ -1145,11 +1152,11 @@ class Crosstalk: # {{{
def apply_grouping(self, verbose=False):
self.spectrumset.apply_grouping(grouping=self.grouping,
- quality=self.quality, verbose=verbose)
+ quality=self.quality, verbose=verbose)
# also group the related surrounding spectra
for specset in self.cross_in_specset:
specset.apply_grouping(grouping=self.grouping,
- quality=self.quality, verbose=verbose)
+ quality=self.quality, verbose=verbose)
self.groupped = True
def estimate_errors(self, gehrels=True, verbose=False):
@@ -1161,7 +1168,7 @@ class Crosstalk: # {{{
specset.estimate_errors(gehrels=gehrels)
def do_correction(self, subtract_bkg=True, fix_negative=False,
- group_squeeze=True, add_history=False, verbose=False):
+ group_squeeze=True, add_history=False, verbose=False):
"""
Perform the crosstalk correction. The background contribution
for each spectrum is subtracted first if `subtract_bkg' is True.
@@ -1177,33 +1184,36 @@ class Crosstalk: # {{{
if verbose:
print("INFO: subtract background ...", file=sys.stderr)
self.spectrumset.subtract_bkg(inplace=True,
- add_history=add_history, verbose=verbose)
+ add_history=add_history,
+ verbose=verbose)
# also apply background subtraction to the surrounding spectra
for specset in self.cross_in_specset:
specset.subtract_bkg(inplace=True,
- add_history=add_history, verbose=verbose)
+ add_history=add_history,
+ verbose=verbose)
# subtractions
if verbose:
print("INFO: apply subtractions ...", file=sys.stderr)
for specset, cross_arf in zip(self.cross_in_specset,
- self.cross_in_arf):
- self.spectrumset.subtract(spectrumset=specset,
- cross_arf=cross_arf, groupped=self.groupped,
- group_squeeze=group_squeeze, add_history=add_history,
- verbose=verbose)
+ self.cross_in_arf):
+ self.spectrumset.subtract(
+ spectrumset=specset, cross_arf=cross_arf,
+ groupped=self.groupped, group_squeeze=group_squeeze,
+ add_history=add_history, verbose=verbose)
# compensations
if verbose:
print("INFO: apply compensations ...", file=sys.stderr)
for cross_arf in self.cross_out_arf:
- self.spectrumset.compensate(cross_arf=cross_arf,
- groupped=self.groupped, group_squeeze=group_squeeze,
- add_history=add_history, verbose=verbose)
+ self.spectrumset.compensate(
+ cross_arf=cross_arf, groupped=self.groupped,
+ group_squeeze=group_squeeze, add_history=add_history,
+ verbose=verbose)
# fix negative values in channels
if fix_negative:
if verbose:
print("INFO: fix negative channel values ...", file=sys.stderr)
self.spectrumset.fix_negative(add_history=add_history,
- verbose=verbose)
+ verbose=verbose)
if add_history:
self.spectrumset.header.add_history("END Crosstalk Correction")
@@ -1255,10 +1265,6 @@ class Deprojection: # {{{
https://www-xray.ast.cam.ac.uk/papers/dsdeproj/
Sanders & Fabian 2007, MNRAS, 381, 1381
"""
- spectra = None
- grouping = None
- quality = None
-
def __init__(self, spectra, grouping=None, quality=None, verbose=False):
"""
Arguments:
@@ -1273,20 +1279,20 @@ class Deprojection: # {{{
raise ValueError("Not a 'SpectrumSet' object")
spec.read_xflt()
self.spectra.append(spec)
- self.spectra = spectra
+ self.spectra = spectra
self.grouping = grouping
- self.quality = quality
+ self.quality = quality
# sort spectra by `radius_outer'
self.spectra.sort(key=lambda x: x.radius_outer)
# set the inner radii
- radii_inner = [0.0] + [ x.radius_outer for x in self.spectra[:-1] ]
+ radii_inner = [0.0] + [x.radius_outer for x in self.spectra[:-1]]
for spec, rin in zip(self.spectra, radii_inner):
spec.set_radius_inner(rin)
if verbose:
print("Deprojection: loaded spectrum: radius: (%s, %s)" %
(spec.radius_inner, spec.radius_outer), file=sys.stderr)
# check EXPOSURE validity (all spectra must have the same exposures)
- exposures = [ spec.EXPOSURE for spec in self.spectra ]
+ exposures = [spec.EXPOSURE for spec in self.spectra]
assert np.allclose(exposures[:-1], exposures[1:])
def subtract_bkg(self, verbose=True):
@@ -1299,7 +1305,7 @@ class Deprojection: # {{{
def apply_grouping(self, verbose=False):
for spec in self.spectra:
spec.apply_grouping(grouping=self.grouping, quality=self.quality,
- verbose=verbose)
+ verbose=verbose)
def estimate_errors(self, gehrels=True):
for spec in self.spectra:
@@ -1313,7 +1319,7 @@ class Deprojection: # {{{
spec.scale()
def do_deprojection(self, group_squeeze=True,
- add_history=False, verbose=False):
+ add_history=False, verbose=False):
#
# TODO/XXX: How to apply ARF correction here???
#
@@ -1326,7 +1332,7 @@ class Deprojection: # {{{
for shellnum in reversed(range(num_spec)):
if verbose:
print("DEPROJECTION: deprojecting shell %d ..." % shellnum,
- file=sys.stderr)
+ file=sys.stderr)
spec = self.spectra[shellnum]
# calculate projected spectrum of outlying shells
proj_spec = np.zeros(spec_shape, spec_dtype)
@@ -1357,8 +1363,8 @@ class Deprojection: # {{{
Extract the spectral data of each spectrum after deprojection
performed.
"""
- return [ spec.get_data(group_squeeze=group_squeeze, copy=copy)
- for spec in self.spectra ]
+ return [spec.get_data(group_squeeze=group_squeeze, copy=copy)
+ for spec in self.spectra]
def set_spec_data(self, spec_data, group_squeeze=True):
"""
@@ -1397,7 +1403,7 @@ class Deprojection: # {{{
Write the deprojected spectra to output file.
"""
if filenames == []:
- filenames = [ spec.outfile for spec in self.spectra ]
+ filenames = [spec.outfile for spec in self.spectra]
for spec, outfile in zip(self.spectra, filenames):
spec.write(filename=outfile, clobber=clobber)
@@ -1445,10 +1451,10 @@ def calc_median_errors(results):
# sort by the Monte Carlo simulation axis
results.sort(0)
mc_times = results.shape[0]
- medians = results[ int(mc_times * 0.5) ]
- lowerpcs = results[ int(mc_times * 0.1585) ]
- upperpcs = results[ int(mc_times * 0.8415) ]
- errors = np.sqrt(0.5 * ((medians-lowerpcs)**2 + (upperpcs-medians)**2))
+ medians = results[int(mc_times * 0.5)]
+ lowerpcs = results[int(mc_times * 0.1585)]
+ upperpcs = results[int(mc_times * 0.8415)]
+ errors = np.sqrt(0.5 * ((medians-lowerpcs)**2 + (upperpcs-medians)**2))
return (medians, errors)
@@ -1465,7 +1471,7 @@ def set_argument(name, default, cmdargs, config):
# main routine {{{
def main(config, subtract_bkg, fix_negative, mc_times,
- verbose=False, clobber=False):
+ verbose=False, clobber=False):
# collect ARFs and RMFs into dictionaries (avoid interpolation every time)
arf_files = set()
rmf_files = set()
@@ -1480,9 +1486,9 @@ def main(config, subtract_bkg, fix_negative, mc_times,
for arf in config_reg["cross_out"].as_list("cross_arf"):
arf_files.add(arf)
arf_files = arf_files - set([None])
- arf_dict = { arf: ARF(arf) for arf in arf_files }
+ arf_dict = {arf: ARF(arf) for arf in arf_files}
rmf_files = rmf_files - set([None])
- rmf_dict = { rmf: RMF(rmf) for rmf in rmf_files }
+ rmf_dict = {rmf: RMF(rmf) for rmf in rmf_files}
if verbose:
print("INFO: arf_files:", arf_files, file=sys.stderr)
print("INFO: rmf_files:", rmf_files, file=sys.stderr)
@@ -1490,7 +1496,7 @@ def main(config, subtract_bkg, fix_negative, mc_times,
# get the GROUPING and QUALITY data
grouping_fits = fits.open(config["grouping"])
grouping = grouping_fits["SPECTRUM"].data.columns["GROUPING"].array
- quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
+ quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
# squeeze the groupped spectral data, etc.
group_squeeze = True
@@ -1504,8 +1510,8 @@ def main(config, subtract_bkg, fix_negative, mc_times,
if verbose:
print("INFO: processing '%s' ..." % region, file=sys.stderr)
crosstalk = Crosstalk(config.get(region),
- arf_dict=arf_dict, rmf_dict=rmf_dict,
- grouping=grouping, quality=quality)
+ arf_dict=arf_dict, rmf_dict=rmf_dict,
+ grouping=grouping, quality=quality)
crosstalk.apply_grouping(verbose=verbose)
crosstalk.estimate_errors(verbose=verbose)
# keep a (almost) clean copy of the crosstalk object
@@ -1513,24 +1519,26 @@ def main(config, subtract_bkg, fix_negative, mc_times,
if verbose:
print("INFO: doing crosstalk correction ...", file=sys.stderr)
crosstalk.do_correction(subtract_bkg=subtract_bkg,
- fix_negative=fix_negative, group_squeeze=group_squeeze,
- add_history=True, verbose=verbose)
+ fix_negative=fix_negative,
+ group_squeeze=group_squeeze,
+ add_history=True, verbose=verbose)
cc_spectra.append(crosstalk.get_spectrum(copy=True))
# load back the crosstalk-corrected spectra for deprojection
if verbose:
print("INFO: preparing spectra for deprojection ...", file=sys.stderr)
deprojection = Deprojection(spectra=cc_spectra, grouping=grouping,
- quality=quality, verbose=verbose)
+ quality=quality, verbose=verbose)
if verbose:
print("INFO: scaling spectra according the region angular size...",
- file=sys.stderr)
+ file=sys.stderr)
deprojection.scale()
if verbose:
print("INFO: doing deprojection ...", file=sys.stderr)
deprojection.do_deprojection(add_history=True, verbose=verbose)
- deproj_results = [ deprojection.get_spec_data(
- group_squeeze=group_squeeze, copy=True) ]
+ deproj_results = []
+ deproj_results.append(deprojection.get_spec_data(
+ group_squeeze=group_squeeze, copy=True))
# Monte Carlo for spectral group error estimation
print("INFO: Monte Carlo to estimate spectral errors (%d times) ..." %
@@ -1544,12 +1552,14 @@ def main(config, subtract_bkg, fix_negative, mc_times,
# copy and randomize
crosstalk_copy = crosstalk.copy().randomize()
crosstalk_copy.do_correction(subtract_bkg=subtract_bkg,
- fix_negative=fix_negative, group_squeeze=group_squeeze,
- add_history=False, verbose=False)
+ fix_negative=fix_negative,
+ group_squeeze=group_squeeze,
+ add_history=False, verbose=False)
cc_spectra_copy.append(crosstalk_copy.get_spectrum(copy=True))
# deproject spectra
deprojection_copy = Deprojection(spectra=cc_spectra_copy,
- grouping=grouping, quality=quality, verbose=False)
+ grouping=grouping,
+ quality=quality, verbose=False)
deprojection_copy.scale()
deprojection_copy.do_deprojection(add_history=False, verbose=False)
deproj_results.append(deprojection_copy.get_spec_data(
@@ -1558,7 +1568,7 @@ def main(config, subtract_bkg, fix_negative, mc_times,
if verbose:
print("INFO: Calculating the median and errors for each spectrum ...",
- file=sys.stderr)
+ file=sys.stderr)
medians, errors = calc_median_errors(deproj_results)
deprojection.set_spec_data(medians, group_squeeze=group_squeeze)
deprojection.add_stat_err(errors, group_squeeze=group_squeeze)
@@ -1583,9 +1593,9 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
arf_files.add(config_reg.get("arf"))
rmf_files.add(config_reg.get("rmf"))
arf_files = arf_files - set([None])
- arf_dict = { arf: ARF(arf) for arf in arf_files }
+ arf_dict = {arf: ARF(arf) for arf in arf_files}
rmf_files = rmf_files - set([None])
- rmf_dict = { rmf: RMF(rmf) for rmf in rmf_files }
+ rmf_dict = {rmf: RMF(rmf) for rmf in rmf_files}
if verbose:
print("INFO: arf_files:", arf_files, file=sys.stderr)
print("INFO: rmf_files:", rmf_files, file=sys.stderr)
@@ -1593,7 +1603,7 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
# get the GROUPING and QUALITY data
grouping_fits = fits.open(config["grouping"])
grouping = grouping_fits["SPECTRUM"].data.columns["GROUPING"].array
- quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
+ quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
# squeeze the groupped spectral data, etc.
group_squeeze = True
@@ -1603,24 +1613,24 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
proj_spectra = []
for region in config.sections:
config_reg = config[region]
- specset = SpectrumSet(filename=config_reg["spec"],
- outfile=config_reg["outfile"],
- arf=arf_dict.get(config_reg["arf"], config_reg["arf"]),
- rmf=rmf_dict.get(config_reg["rmf"], config_reg["rmf"]),
- bkg=config_reg["bkg"])
+ specset = SpectrumSet(
+ filename=config_reg["spec"], outfile=config_reg["outfile"],
+ arf=arf_dict.get(config_reg["arf"], config_reg["arf"]),
+ rmf=rmf_dict.get(config_reg["rmf"], config_reg["rmf"]),
+ bkg=config_reg["bkg"])
proj_spectra.append(specset)
deprojection = Deprojection(spectra=proj_spectra, grouping=grouping,
- quality=quality, verbose=verbose)
+ quality=quality, verbose=verbose)
deprojection.apply_grouping(verbose=verbose)
deprojection.estimate_errors()
if verbose:
print("INFO: scaling spectra according the region angular size ...",
- file=sys.stderr)
+ file=sys.stderr)
deprojection.scale()
# keep a (almost) clean copy of the input projected spectra
- proj_spectra_cleancopy = [ spec.copy() for spec in proj_spectra ]
+ proj_spectra_cleancopy = [spec.copy() for spec in proj_spectra]
if verbose:
print("INFO: subtract the background ...", file=sys.stderr)
@@ -1628,8 +1638,9 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
if verbose:
print("INFO: doing deprojection ...", file=sys.stderr)
deprojection.do_deprojection(add_history=True, verbose=verbose)
- deproj_results = [ deprojection.get_spec_data(
- group_squeeze=group_squeeze, copy=True) ]
+ deproj_results = []
+ deproj_results.append(deprojection.get_spec_data(
+ group_squeeze=group_squeeze, copy=True))
# Monte Carlo for spectral group error estimation
print("INFO: Monte Carlo to estimate spectral errors (%d times) ..." %
@@ -1638,11 +1649,12 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
if i % 100 == 0:
print("%d..." % i, end="", flush=True, file=sys.stderr)
# copy and randomize the input projected spectra
- proj_spectra_copy = [ spec.copy().randomize()
- for spec in proj_spectra_cleancopy ]
+ proj_spectra_copy = [spec.copy().randomize()
+ for spec in proj_spectra_cleancopy]
# deproject spectra
deprojection_copy = Deprojection(spectra=proj_spectra_copy,
- grouping=grouping, quality=quality, verbose=False)
+ grouping=grouping,
+ quality=quality, verbose=False)
deprojection_copy.subtract_bkg(verbose=False)
deprojection_copy.do_deprojection(add_history=False, verbose=False)
deproj_results.append(deprojection_copy.get_spec_data(
@@ -1651,7 +1663,7 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
if verbose:
print("INFO: Calculating the median and errors for each spectrum ...",
- file=sys.stderr)
+ file=sys.stderr)
medians, errors = calc_median_errors(deproj_results)
deprojection.set_spec_data(medians, group_squeeze=group_squeeze)
deprojection.add_stat_err(errors, group_squeeze=group_squeeze)
@@ -1665,7 +1677,7 @@ def main_deprojection(config, mc_times, verbose=False, clobber=False):
# main_crosstalk routine {{{
def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
- verbose=False, clobber=False):
+ verbose=False, clobber=False):
"""
Only perform the crosstalk correction.
"""
@@ -1683,9 +1695,9 @@ def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
for arf in config_reg["cross_out"].as_list("cross_arf"):
arf_files.add(arf)
arf_files = arf_files - set([None])
- arf_dict = { arf: ARF(arf) for arf in arf_files }
+ arf_dict = {arf: ARF(arf) for arf in arf_files}
rmf_files = rmf_files - set([None])
- rmf_dict = { rmf: RMF(rmf) for rmf in rmf_files }
+ rmf_dict = {rmf: RMF(rmf) for rmf in rmf_files}
if verbose:
print("INFO: arf_files:", arf_files, file=sys.stderr)
print("INFO: rmf_files:", rmf_files, file=sys.stderr)
@@ -1694,11 +1706,11 @@ def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
if "grouping" in config.keys():
grouping_fits = fits.open(config["grouping"])
grouping = grouping_fits["SPECTRUM"].data.columns["GROUPING"].array
- quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
+ quality = grouping_fits["SPECTRUM"].data.columns["QUALITY"].array
group_squeeze = True
else:
grouping = None
- quality = None
+ quality = None
group_squeeze = False
# crosstalk objects (BEFORE background subtraction)
@@ -1711,8 +1723,8 @@ def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
if verbose:
print("INFO: processing '%s' ..." % region, file=sys.stderr)
crosstalk = Crosstalk(config.get(region),
- arf_dict=arf_dict, rmf_dict=rmf_dict,
- grouping=grouping, quality=quality)
+ arf_dict=arf_dict, rmf_dict=rmf_dict,
+ grouping=grouping, quality=quality)
if grouping is not None:
crosstalk.apply_grouping(verbose=verbose)
crosstalk.estimate_errors(verbose=verbose)
@@ -1721,15 +1733,16 @@ def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
if verbose:
print("INFO: doing crosstalk correction ...", file=sys.stderr)
crosstalk.do_correction(subtract_bkg=subtract_bkg,
- fix_negative=fix_negative, group_squeeze=group_squeeze,
- add_history=True, verbose=verbose)
+ fix_negative=fix_negative,
+ group_squeeze=group_squeeze,
+ add_history=True, verbose=verbose)
crosstalk.fix_header()
cc_spectra.append(crosstalk.get_spectrum(copy=True))
# spectral data of the crosstalk-corrected spectra
cc_results = []
- cc_results.append([ spec.get_data(group_squeeze=group_squeeze, copy=True)
- for spec in cc_spectra ])
+ cc_results = ([spec.get_data(group_squeeze=group_squeeze, copy=True)
+ for spec in cc_spectra])
# Monte Carlo for spectral group error estimation
print("INFO: Monte Carlo to estimate spectral errors (%d times) ..." %
@@ -1743,17 +1756,18 @@ def main_crosstalk(config, subtract_bkg, fix_negative, mc_times,
# copy and randomize
crosstalk_copy = crosstalk.copy().randomize()
crosstalk_copy.do_correction(subtract_bkg=subtract_bkg,
- fix_negative=fix_negative, group_squeeze=group_squeeze,
- add_history=False, verbose=False)
+ fix_negative=fix_negative,
+ group_squeeze=group_squeeze,
+ add_history=False, verbose=False)
cc_spectra_copy.append(crosstalk_copy.get_spectrum(copy=True))
- cc_results.append([ spec.get_data(group_squeeze=group_squeeze,
- copy=True)
- for spec in cc_spectra_copy ])
+ cc_results.append([spec.get_data(group_squeeze=group_squeeze,
+ copy=True)
+ for spec in cc_spectra_copy])
print("DONE!", flush=True, file=sys.stderr)
if verbose:
print("INFO: Calculating the median and errors for each spectrum ...",
- file=sys.stderr)
+ file=sys.stderr)
medians, errors = calc_median_errors(cc_results)
if verbose:
print("INFO: Writing the crosstalk-corrected spectra " +
@@ -1807,16 +1821,16 @@ if __name__ == "__main__":
if mode.lower() == "both":
print("MODE: CROSSTALK + DEPROJECTION", file=sys.stderr)
main(config, subtract_bkg=subtract_bkg, fix_negative=fix_negative,
- mc_times=mc_times, verbose=verbose, clobber=clobber)
+ mc_times=mc_times, verbose=verbose, clobber=clobber)
elif mode.lower() == "deprojection":
print("MODE: DEPROJECTION", file=sys.stderr)
main_deprojection(config, mc_times=mc_times,
- verbose=verbose, clobber=clobber)
+ verbose=verbose, clobber=clobber)
elif mode.lower() == "crosstalk":
print("MODE: CROSSTALK", file=sys.stderr)
main_crosstalk(config, subtract_bkg=subtract_bkg,
- fix_negative=fix_negative, mc_times=mc_times,
- verbose=verbose, clobber=clobber)
+ fix_negative=fix_negative, mc_times=mc_times,
+ verbose=verbose, clobber=clobber)
else:
raise ValueError("Invalid operation mode: %s" % mode)
print(WARNING)