aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAaron LI <aly@aaronly.me>2017-12-12 14:07:29 +0800
committerAaron LI <aly@aaronly.me>2017-12-12 14:07:29 +0800
commit26dfdb16d9ff943084da0b21a8a1be1be0b80ead (patch)
treef32a7d8dfc982df1786c37c52d754723d3a73f70
parent48923e3afd3b8ee049b199d352e2912dceca5476 (diff)
downloadatoolbox-26dfdb16d9ff943084da0b21a8a1be1be0b80ead.tar.bz2
Add astro/suzaku/make_photon_list.py: implement ``mkphlist`` for suzaku simulation
-rwxr-xr-xastro/suzaku/make_photon_list.py390
1 files changed, 390 insertions, 0 deletions
diff --git a/astro/suzaku/make_photon_list.py b/astro/suzaku/make_photon_list.py
new file mode 100755
index 0000000..d8fe679
--- /dev/null
+++ b/astro/suzaku/make_photon_list.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2017 Weitian LI <weitian@aaronly.me>
+# MIT License
+#
+
+"""
+Make/simulate the X-ray photon list from the object's image and
+spectral models.
+
+The simulated X-ray photon list will be used to simulate the
+Suzaku event observation by ``xissim`` tool.
+
+This script is intended to replace and extend the abilities of the
+``mkphlist`` tool.
+
+NOTE
+----
+The environment variable ``HEADAS`` should be set in order to help
+locate the ``PyXspec`` module and XSPEC shared libraries.
+
+References
+----------
+* mkphlist: https://heasarc.gsfc.nasa.gov/lheasoft/ftools/headas/mkphlist.txt
+* xissim: https://heasarc.gsfc.nasa.gov/lheasoft/ftools/headas/xissim.txt
+* PyXspec: https://heasarc.gsfc.nasa.gov/xanadu/xspec/python/html/index.html
+
+Example Configuration File
+-----------------------------------------------------------------------
+# image to determine the photon counts distribution
+image: imgbox800_e500-7000_sm.fits
+# region (annuli below) center; in "image" coordinate
+center: [400, 399]
+nh: 0.03 # 1e22 [cm^-2]
+redshift: 0.0137
+# simulated photon energy range [keV]
+erange: [0.3, 10.0]
+# number of energy bins (logarithmic)
+ebins: 1000
+# total photon counts that will be generated
+counts: 300000
+# exposure [ks]
+exposure: 50
+# a set of annular regions, with several pie regions inside each
+# annulus; each pie region can have a different spectral model.
+regions:
+ # annulus 1, with 3 pies
+ - radius: [0, 100]
+ angle: [0, 120, 200]
+ temperature: [1.0, 1.5, 2.0]
+ abundance: [0.5, 1.0, 1.5]
+ weight: [1, 2, 1.5]
+ # annulus 2, with 3 pies
+ - radius: [100, 200]
+ angle: [0, 90, 250]
+ temperature: [0.5, 1.0, 1.5]
+ abundance: [1.5, 2.0, 1.0]
+ weight: [0.5, 1, 1.5]
+ # annulus 3, with 4 pies
+ - radius: [200, 400]
+ angle: [50, 150, 220, 300]
+ temperature: [0.8, 1.2, 1.5, 1.3]
+ abundance: [1.1, 2.0, 1.5, 1.2]
+ weight: [0.2, 1.5, 0.7, 2]
+clobber: True
+outfiles:
+ photons_table: photons.fits
+ counts_map: counts_map.fits
+ temperature_map: temperature_map.fits
+ abundance_map: abundance_map.fits
+-----------------------------------------------------------------------
+"""
+
+import os
+import sys
+
+try:
+ headas = os.environ["HEADAS"]
+ healib = os.path.join(headas, "lib")
+except KeyError:
+ raise ValueError("env variable 'HEADAS' not set")
+
+if ("LD_LIBRARY_PATH" not in os.environ) or (
+ os.environ["LD_LIBRARY_PATH"].find(healib) < 0):
+ os.environ["LD_LIBRARY_PATH"] = ":".join([
+ healib, os.environ.get("LD_LIBRARY_PATH", "")
+ ])
+ try:
+ # Hack the ``LD_LIBRARY_PATH`` to import Xspec
+ # Credit: https://stackoverflow.com/a/25457751/4856091
+ print("sys.argv:", sys.argv)
+ os.execv(sys.argv[0], sys.argv)
+ except Exception:
+ print("ERROR: failed to re-exec with new LD_LIBRARY_PATH")
+ raise
+
+sys.path.append(os.path.join(healib, "python"))
+import xspec
+print("Imported XSPEC!")
+
+import argparse
+import logging
+from pprint import pprint
+
+import yaml
+import numpy as np
+from astropy.io import fits
+from astropy.wcs import WCS
+
+
+logging.basicConfig(level=logging.INFO,
+ format="[%(levelname)s:%(lineno)d] %(message)s")
+logger = logging.getLogger()
+
+
+class Pie:
+ """
+ Pie region
+ """
+ def __init__(self, xc, yc, rin, rout, abegin, aend):
+ self.xc = xc
+ self.yc = yc
+ self.rin = rin
+ self.rout = rout
+ self.abegin = abegin # [deg] beginning angle
+ self.aend = aend # [deg] ending angle (may be > 360)
+ # spectral model parameters
+ self._modelpars = {}
+
+ @staticmethod
+ def cart2pol(x, y):
+ rho = np.sqrt(x**2 + y**2)
+ phi = 180 + np.rad2deg(np.arctan2(y, x)) # 0-360 [deg]
+ return (rho, phi)
+
+ def make_mask(self, shape):
+ try:
+ nrow, ncol = shape
+ except TypeError:
+ nrow = ncol = shape
+ # HACK: to make the masks consistent with ``rand_position()``
+ ix = self.xc - np.arange(ncol)
+ iy = self.yc - np.arange(nrow)
+ mx, my = np.meshgrid(ix, iy)
+ rho, phi = self.cart2pol(mx, my)
+ mask_rho = (rho >= self.rin) & (rho <= self.rout)
+ mask_phi = (phi >= self.abegin) & (phi <= self.aend)
+ if self.aend > 360:
+ mask_phi |= (phi <= (self.aend-360))
+ mask = mask_rho & mask_phi
+ return mask
+
+ def rand_position(self, n=None):
+ if n is None:
+ n = self.modelpar("counts")
+ theta = np.random.uniform(low=self.abegin, high=self.aend, size=n)
+ r = np.sqrt(np.random.uniform(low=self.rin**2, high=self.rout**2,
+ size=n))
+ x = r * np.cos(np.deg2rad(theta)) + self.xc
+ y = r * np.sin(np.deg2rad(theta)) + self.yc
+ return (x, y)
+
+ def modelpar(self, key=None, value=None):
+ if key is None:
+ return self._modelpars
+ elif value is None:
+ return self._modelpars.get(key)
+ else:
+ self._modelpars[key] = value
+
+ def set_model(self, nh, redshift):
+ model = xspec.Model("wabs*apec")
+ model.wabs.nH = nh
+ model.apec.Redshift = redshift
+ model.apec.kT = self.modelpar("temperature")
+ model.apec.Abundanc = self.modelpar("abundance")
+ self._model = model
+
+ def rand_photons(self, n=None):
+ if n is None:
+ n = self.modelpar("counts")
+ model = self._model
+ mvalues = np.array(model.values(0), dtype=float) # len: ebins
+ p = mvalues / mvalues.sum()
+ menergies = np.array(model.energies(0), dtype=float) # len: ebins+1
+ mebins = np.sqrt(menergies[1:] * menergies[:-1])
+ photons = np.random.choice(mebins, size=n, p=p)
+ return photons # [keV]
+
+
+class Regions:
+ """
+ Configured regions
+ """
+ def __init__(self, configs):
+ self.configs = configs
+ self.xc, self.yc = configs["center"]
+
+ @property
+ def rmax(self):
+ rmax = 0
+ for annulus in self.configs["regions"]:
+ rin, rout = annulus["radius"]
+ if rmax < rout:
+ rmax = rout
+ return rmax
+
+ def make_mask(self, shape):
+ try:
+ nrow, ncol = shape
+ except TypeError:
+ nrow = ncol = shape
+ ix = np.arange(ncol) - self.xc
+ iy = np.arange(nrow) - self.yc
+ mx, my = np.meshgrid(ix, iy)
+ rho = np.sqrt(mx**2 + my**2)
+ mask = (rho <= self.rmax)
+ return mask
+
+ @property
+ def regions(self):
+ reg_all = []
+ for annulus in self.configs["regions"]:
+ reg_annulus = []
+ rin, rout = annulus["radius"]
+ abegin = annulus["angle"]
+ aend = abegin[1:] + [abegin[0]+360]
+ npie = len(abegin)
+ temperature = annulus["temperature"]
+ abundance = annulus["abundance"]
+ weight = annulus.get("weight", [1]*npie)
+ for i in range(npie):
+ pie = Pie(xc=self.xc, yc=self.yc, rin=rin, rout=rout,
+ abegin=abegin[i], aend=aend[i])
+ pie.modelpar("temperature", temperature[i])
+ pie.modelpar("abundance", abundance[i])
+ pie.modelpar("weight", weight[i])
+ reg_annulus.append(pie)
+ reg_all.append(reg_annulus)
+ return reg_all
+
+
+def pixel2world(x, y, wcs):
+ pix = np.column_stack([x, y])
+ world = wcs.wcs_pix2world(pix, 0)
+ ra = world[:, 0]
+ dec = world[:, 1]
+ return (ra, dec) # [deg]
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Make/simulate X-ray photon list for Suzaku simulation")
+ parser.add_argument("config", help="configuration file in YAML format")
+ args = parser.parse_args()
+
+ configs = yaml.load(open(args.config))
+ logger.info("Load configuration file: %s" % args.config)
+ logger.info("Configurations:")
+ pprint(configs)
+
+ # Update XSPEC settings
+ emin, emax = configs["erange"] # [keV]
+ ebins = configs["ebins"]
+ xspec.AllModels.setEnergies("%.1f %.1f %d log" % (emin, emax, ebins))
+ logger.info("Energy range: [%.1f, %.1f] [keV]" % (emin, emax))
+ logger.info("Energy: %d logarithmic channels" % ebins)
+
+ with fits.open(configs["image"]) as f:
+ header = f[0].header
+ image = f[0].data
+ shape = image.shape
+ logger.info("Image size: %dx%d" % (shape[1], shape[0]))
+
+ wcs = WCS(header)
+ regions = Regions(configs)
+ reg_all = regions.regions
+ mask_all = regions.make_mask(shape=shape)
+ weight_all = np.sum(image[mask_all])
+
+ counts_all = configs["counts"]
+ logger.info("Total counts: %d" % counts_all)
+
+ logger.info("nH: %.4f [1e22 cm^-2]" % configs["nh"])
+ logger.info("Redshift: %.5f" % configs["redshift"])
+ exposure = configs["exposure"] * 1e3 # [s]
+ logger.info("Exposure time: %.1f [s]" % exposure)
+
+ logger.info("Determining photons counts in each region ...")
+ counts_sum = 0
+ for i, annulus in enumerate(reg_all):
+ for j, pie in enumerate(annulus):
+ label = "annu#%d/pie#%d" % (i+1, j+1)
+ mask = pie.make_mask(shape=shape)
+ pixels = np.sum(mask)
+ weight = np.sum(image[mask]) * pie.modelpar("weight")
+ counts = int(counts_all * weight / weight_all)
+ counts_sum += counts
+ pie.modelpar("pixels", pixels)
+ pie.modelpar("counts", counts)
+ logger.info("%s: %d pixels, %d photons" % (label, pixels, counts))
+
+ logger.info("Determined counts sum: %d" % counts_sum)
+ logger.info("Adjusting total counts -> %d" % counts_all)
+ for i, annulus in enumerate(reg_all):
+ for j, pie in enumerate(annulus):
+ label = "annu#%d/pie#%d" % (i+1, j+1)
+ counts_old = pie.modelpar("counts")
+ counts_new = round(counts_old * counts_all / counts_sum)
+ pie.modelpar("counts", counts_new)
+ logger.info("%s: adjusted photon counts: %d -> %d" %
+ (label, counts_old, counts_new))
+
+ # Output files
+ temp_map = np.zeros_like(image)
+ abund_map = np.zeros_like(image)
+ counts_map = np.zeros_like(image)
+ weights_map = np.zeros_like(image)
+ photonlist = []
+
+ for i, annulus in enumerate(reg_all):
+ for j, pie in enumerate(annulus):
+ label = "annu#%d/pie#%d" % (i+1, j+1)
+ pie.set_model(nh=configs["nh"], redshift=configs["redshift"])
+ mask = pie.make_mask(shape=shape)
+ temp = pie.modelpar("temperature")
+ abund = pie.modelpar("abundance")
+ counts = pie.modelpar("counts")
+ logger.info("%s: kT=%.2f, Z=%.2f, %d photons" %
+ (label, temp, abund, counts))
+
+ logger.info("%s: sampling photon positions ..." % label)
+ x, y = pie.rand_position(n=counts)
+ ra, dec = pixel2world(x, y, wcs=wcs)
+ logger.info("%s: sampling photon energies ..." % label)
+ energies = pie.rand_photons(n=counts)
+ time = np.random.uniform(low=0, high=exposure, size=counts)
+ photons = np.column_stack([time, energies, ra, dec])
+ photonlist.append(photons)
+
+ logger.info("%s: spatially binning photons ..." % label)
+ rbins = np.arange(shape[0]+1, dtype=int)
+ cbins = np.arange(shape[1]+1, dtype=int)
+ hist2d, __, __ = np.histogram2d(y, x, bins=(rbins, cbins))
+ counts_map += hist2d
+
+ temp_map[mask] = temp
+ abund_map[mask] = abund
+ weights_map[mask] = pie.modelpar("weight")
+
+ logger.info("Creating output FITS header ...")
+ header_out = fits.Header()
+ header_out.extend(wcs.to_header(), update=True)
+ header_out["CREATOR"] = os.path.basename(sys.argv[0])
+ header_out.add_history(" ".join(sys.argv))
+ logger.info("Creating photons table ...")
+ photons = np.row_stack(photonlist)
+ photons.sort(axis=0) # sort by time in place
+ hdu = fits.BinTableHDU.from_columns([
+ fits.Column(name="PHOTON_TIME", format="D", unit="s",
+ array=photons[:, 0]),
+ fits.Column(name="PHOTON_ENERGY", format="E", unit="keV",
+ array=photons[:, 1]),
+ fits.Column(name="RA", format="E", unit="deg", array=photons[:, 2]),
+ fits.Column(name="DEC", format="E", unit="deg", array=photons[:, 3]),
+ ], header=header_out)
+ hdu.name = "PHOTON_LIST"
+ outfile = configs["outfiles"]["photons_table"]
+ hdu.writeto(outfile, overwrite=configs["clobber"])
+ logger.info("Wrote photons table to: %s" % outfile)
+
+ data = np.stack([counts_map, weights_map], axis=0)
+ hdu = fits.PrimaryHDU(data=data, header=header_out)
+ outfile = configs["outfiles"]["counts_map"]
+ hdu.writeto(outfile, overwrite=configs["clobber"])
+ logger.info("Wrote counts/weights map to: %s" % outfile)
+ #
+ hdu = fits.PrimaryHDU(data=temp_map, header=header_out)
+ outfile = configs["outfiles"]["temperature_map"]
+ hdu.writeto(outfile, overwrite=configs["clobber"])
+ logger.info("Wrote temperature map to: %s" % outfile)
+ #
+ hdu = fits.PrimaryHDU(data=abund_map, header=header_out)
+ outfile = configs["outfiles"]["abundance_map"]
+ hdu.writeto(outfile, overwrite=configs["clobber"])
+ logger.info("Wrote abundance map to: %s" % outfile)
+
+
+if __name__ == "__main__":
+ main()