From 40c1dee62168b2bff91bab06027fb4b7e3145fc8 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Sat, 26 Aug 2017 12:28:01 +0800 Subject: snr.py: Update to use "sky" module XXX: significant cleanups required, to make `SkyPatch` works! --- fg21sim/galactic/snr.py | 153 ++++++++++++------------------------------------ 1 file changed, 39 insertions(+), 114 deletions(-) (limited to 'fg21sim/galactic') diff --git a/fg21sim/galactic/snr.py b/fg21sim/galactic/snr.py index 18d8262..e77e24e 100644 --- a/fg21sim/galactic/snr.py +++ b/fg21sim/galactic/snr.py @@ -25,13 +25,10 @@ References import os import logging -from datetime import datetime, timezone import numpy as np -from astropy.io import fits -from astropy.coordinates import SkyCoord -import astropy.units as au import pandas as pd +from astropy.coordinates import SkyCoord from ..sky import get_sky from ..utils.wcs import make_wcs @@ -69,29 +66,33 @@ class SuperNovaRemnants: TODO """ # Component name + compID = "galactic/snr" name = "Galactic SNRs" def __init__(self, configs): self.configs = configs - self.sky = get_sky(configs) self._set_configs() + self.sky = get_sky(configs) + self.sky.add_header("CompID", self.compID, "Emission component ID") + self.sky.add_header("CompName", self.name, "Emission component") + self.sky.add_header("BUNIT", "K", "[Kelvin] Data unit") + self.sky.creator = __name__ + def _set_configs(self): - """Load the configs and set the corresponding class attributes.""" - comp = "galactic/snr" + """ + Load the configs and set the corresponding class attributes. + """ + comp = self.compID self.catalog_path = self.configs.get_path(comp+"/catalog") self.catalog_outfile = self.configs.get_path(comp+"/catalog_outfile") self.resolution = self.configs.getn(comp+"/resolution") # [ arcsec ] self.prefix = self.configs.getn(comp+"/prefix") - self.save = self.configs.getn(comp+"/save") self.output_dir = self.configs.get_path(comp+"/output_dir") # self.filename_pattern = self.configs.getn("output/filename_pattern") - self.use_float = self.configs.getn("output/use_float") - self.checksum = self.configs.getn("output/checksum") self.clobber = self.configs.getn("output/clobber") self.frequencies = self.configs.frequencies # [MHz] - self.freq_unit = au.Unit(self.configs.getn("frequency/unit")) logger.info("Loaded and set up configurations") def _load_catalog(self): @@ -112,8 +113,8 @@ class SuperNovaRemnants: self.catalog_path)) logger.info("SNRs catalog data: {0} objects, {1} columns".format( nrow, ncol)) - # The flux densities are given at 1 GHz - self.catalog_flux_freq = (1.0*au.GHz).to(self.freq_unit).value + # The flux densities are given at 1 [GHz] -> 1000 [MHz] + self.catalog_flux_freq = 1e3 # [MHz] # Convert ``size_major`` and ``size_minor`` from unit [arcmin] # to [arcsec] self.catalog["size_major"] *= AUC.arcmin2arcsec @@ -312,7 +313,8 @@ class SuperNovaRemnants: ``data.key``. e.g., elements of `self.catalog.itertuples()` frequency : float - The simulation frequency (unit: `self.freq_unit`). + The simulation frequency. + Unit: [MHz] Returns ------- @@ -336,65 +338,6 @@ class SuperNovaRemnants: val = val * Tb return (idx, val) - def _make_filepath(self, **kwargs): - """ - Make the path of output file according to the filename pattern - and output directory loaded from configurations. - """ - data = { - "prefix": self.prefix, - } - data.update(kwargs) - filename = self.filename_pattern.format(**data) - filepath = os.path.join(self.output_dir, filename) - return filepath - - def _make_header(self): - """ - Make the header with detail information (e.g., parameters and - history) for the simulated products. - """ - header = fits.Header() - header["COMP"] = (self.name, "Emission component") - header["BUNIT"] = ("K", "data unit is Kelvin") - header["CREATOR"] = (__name__, "File creator") - # TODO: - history = [] - comments = [] - for hist in history: - header.add_history(hist) - for cmt in comments: - header.add_comment(cmt) - self.header = header - logger.info("Created FITS header") - - def output(self, skymap, frequency): - """ - Write the simulated free-free map to disk with proper header - keywords and history. - - Returns - ------- - outfile : str - The (absolute) path to the output sky map file. - """ - outfile = self._make_filepath(frequency=frequency) - if not hasattr(self, "header"): - self._make_header() - header = self.header.copy() - header["FREQ"] = (frequency, "Frequency [ MHz ]") - header["DATE"] = ( - datetime.now(timezone.utc).astimezone().isoformat(), - "File creation date" - ) - if self.use_float: - skymap = skymap.astype(np.float32) - sky = self.sky.copy() - sky.data = skymap - sky.header = header - sky.write(outfile, clobber=self.clobber, checksum=self.checksum) - return outfile - def preprocess(self): """ Perform the preparation procedures for the final simulations. @@ -403,19 +346,18 @@ class SuperNovaRemnants: ---------- _preprocessed : bool This attribute presents and is ``True`` after the preparation - procedures are performed, which indicates that it is ready to - do the final simulations. + procedures are performed. """ if hasattr(self, "_preprocessed") and self._preprocessed: return - # + logger.info("{name}: preprocessing ...".format(name=self.name)) self._load_catalog() self._filter_catalog() self._add_random_rotation() # Simulate the template maps for each SNR self._simulate_templates() - # + self._preprocessed = True def simulate_frequency(self, frequency): @@ -426,39 +368,31 @@ class SuperNovaRemnants: Parameters ---------- frequency : float - The simulation frequency (unit: `self.freq_unit`). + The simulation frequency. + Unit: [MHz] Returns ------- - skymap_f : 1D/2D `~numpy.ndarray` - The sky map at the input frequency. - filepath : str - The (absolute) path to the output sky map if saved, - otherwise ``None``. + sky : `~SkyBase` + The simulated sky image as a new sky instance. See Also -------- `self._simulate_template()` for more detailed description. """ - self.preprocess() - # - logger.info("Simulating {name} map at {freq} ({unit}) ...".format( - name=self.name, freq=frequency, unit=self.freq_unit)) - skymap_f = np.zeros(self.sky.shape) + logger.info("Simulating {name} map at {freq:.2f} [MHz] ...".format( + name=self.name, freq=frequency)) + sky = self.sky.copy() + sky.frequency = frequency for row in self.catalog.itertuples(): index, value = self._simulate_single(row, frequency) - skymap_f[index] += value - # - if self.save: - filepath = self.output(skymap_f, frequency) - else: - filepath = None - return (skymap_f, filepath) + sky.data[index] += value + logger.info("Done simulate map at %.2f [MHz]." % frequency) + return sky def simulate(self, frequencies=None): """ - Simulate the sky maps of all Galactic SNRs emission at every - specified frequency. + Simulate the sky maps of all Galactic SNRs emission. Parameters ---------- @@ -466,27 +400,18 @@ class SuperNovaRemnants: The frequencies where to simulate the foreground map. Unit: [MHz] Default: None (i.e., use ``self.frequencies``) - - - Returns - ------- - skymaps : list[1D `~numpy.ndarray`] - List of sky maps at each frequency. - paths : list[str] - List of (absolute) path to the output sky maps. """ - if frequencies is not None: - frequencies = np.array(frequencies, ndmin=1) - else: + if frequencies is None: frequencies = self.frequencies + else: + frequencies = np.array(frequencies, ndmin=1) - skymaps = [] - paths = [] + logger.info("Simulating {name} ...".format(name=self.name)) for freq in frequencies: - skymap_f, outfile = self.simulate_frequency(freq) - skymaps.append(skymap_f) - paths.append(outfile) - return (skymaps, paths) + sky = self.simulate_frequency(freq) + outfile = self._outfilepath(frequency=freq) + sky.write(outfile) + logger.info("Done simulate {name}!".format(name=self.name)) def postprocess(self): """Perform the post-simulation operations before the end.""" -- cgit v1.2.2