From 7d6db498b2993084c12640aa505ca459476126d2 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Sat, 26 Aug 2017 11:20:24 +0800 Subject: synchrotron.py: Update to use "sky" module; significant cleanups --- fg21sim/galactic/synchrotron.py | 180 +++++++++++++++------------------------- 1 file changed, 69 insertions(+), 111 deletions(-) (limited to 'fg21sim') diff --git a/fg21sim/galactic/synchrotron.py b/fg21sim/galactic/synchrotron.py index 0f89a99..f2de2ce 100644 --- a/fg21sim/galactic/synchrotron.py +++ b/fg21sim/galactic/synchrotron.py @@ -7,13 +7,11 @@ Diffuse Galactic synchrotron emission (unpolarized) simulations. import os import logging -from datetime import datetime, timezone import numpy as np -from astropy.io import fits -import astropy.units as au import healpy as hp +from ..share import CONFIGS from ..sky import get_sky @@ -27,32 +25,41 @@ class Synchrotron: Parameters ---------- - configs : ConfigManager object + configs : `~ConfigManager` An `ConfigManager` object contains default and user configurations. For more details, see the example config specification. Attributes ---------- - ??? + sky : `~SkyBase` + The sky instance to deal with the simulation sky as well as the + output map. References ---------- ??? """ # Component name + compID = "galactic/synchrotron" name = "Galactic synchrotron (unpolarized)" - def __init__(self, configs): + def __init__(self, configs=CONFIGS): self.configs = configs self._set_configs() + self.sky = get_sky(configs) + self.sky.add_header("CompID", self.compID, "Emission component ID") + self.sky.add_header("CompName", self.name, "Emission component") + self.sky.add_header("BUNIT", "K", "[Kelvin] Data unit") + self.sky.creator = __name__ + def _set_configs(self): - """Load the configs and set the corresponding class attributes.""" - comp = "galactic/synchrotron" + """ + Load the configs and set the corresponding class attributes. + """ + comp = self.compID self.template_path = self.configs.get_path(comp+"/template") self.template_freq = self.configs.getn(comp+"/template_freq") - self.template_unit = au.Unit( - self.configs.getn(comp+"/template_unit")) self.indexmap_path = self.configs.get_path(comp+"/indexmap") self.add_smallscales = self.configs.getn(comp+"/add_smallscales") self.smallscales_added = False @@ -61,22 +68,18 @@ class Synchrotron: self.prefix = self.configs.getn(comp+"/prefix") self.output_dir = self.configs.get_path(comp+"/output_dir") # output + self.frequencies = self.configs.frequencies # [MHz] self.filename_pattern = self.configs.getn("output/filename_pattern") - self.use_float = self.configs.getn("output/use_float") - self.checksum = self.configs.getn("output/checksum") self.clobber = self.configs.getn("output/clobber") - self.frequencies = self.configs.frequencies # [MHz] - self.freq_unit = au.Unit(self.configs.getn("frequency/unit")) # logger.info("Loaded and setup configurations") def _load_maps(self): """Load the template map and spectral index map.""" - sky = get_sky(self.configs) logger.info("Loading template map ...") - self.template = sky.open(self.template_path) + self.template = self.sky.open(self.template_path) logger.info("Loading spectral index map ...") - self.indexmap = sky.open(self.indexmap_path) + self.indexmap = self.sky.open(self.indexmap_path) def _add_smallscales(self): """ @@ -104,8 +107,8 @@ class Synchrotron: logger.warning("Input template map is NOT a HEALPix map; " + "skip adding small-scale fluctuations!") return - # To add small scale fluctuations - # model: Remazeilles15 + + # Parameters to extrapolate the angular power spectrum gamma = -2.703 # index of the power spectrum between l [30, 90] sigma_tp = 56 # original beam resolution of the template [ arcmin ] alpha = 0.0599 @@ -122,67 +125,29 @@ class Synchrotron: # whiten the Gaussian random field gss = (gss - gss.mean()) / gss.std() hpmap_smallscales = alpha * gss * self.template.data**beta - self.template.data += hpmap_smallscales + self.template += hpmap_smallscales logger.info("Added small-scale fluctuations to template map") - def _make_filepath(self, **kwargs): - """ - Make the path of output file according to the filename pattern - and output directory loaded from configurations. + def _outfilepath(self, frequency, **kwargs): """ - data = { - "prefix": self.prefix, - } - data.update(kwargs) - filename = self.filename_pattern.format(**data) - filepath = os.path.join(self.output_dir, filename) - return filepath + Generate the path/filename to the output file for writing + the simulate sky images. - def _make_header(self): - """ - Make the header with detail information (e.g., parameters and - history) for the simulated products. - """ - header = fits.Header() - header["COMP"] = (self.name, "Emission component") - header["BUNIT"] = ("K", "data unit is Kelvin") - header["CREATOR"] = (__name__, "File creator") - # TODO: - history = [] - comments = [] - for hist in history: - header.add_history(hist) - for cmt in comments: - header.add_comment(cmt) - self.header = header - logger.info("Created FITS header") - - def output(self, skymap, frequency): - """ - Write the simulated synchrotron map to disk with proper - header keywords and history. + Parameters + ---------- + frequency : float + The frequency of the output sky image. + Unit: [MHz] Returns ------- - outfile : str - The (absolute) path to the output sky map file. + filepath : str + The generated filepath for the output sky file. """ - outfile = self._make_filepath(frequency=frequency) - if not hasattr(self, "header"): - self._make_header() - header = self.header.copy() - header["FREQ"] = (frequency, "Frequency [ MHz ]") - header["DATE"] = ( - datetime.now(timezone.utc).astimezone().isoformat(), - "File creation date" - ) - if self.use_float: - skymap = skymap.astype(np.float32) - sky = get_sky(configs=self.configs) - sky.data = skymap - sky.header = header - sky.write(outfile, clobber=self.clobber, checksum=self.checksum) - return outfile + filename = self.filename_pattern.format( + prefix=self.prefix, frequency=frequency, **kwargs) + filepath = os.path.join(self.output_dir, filename) + return filepath def preprocess(self): """ @@ -197,39 +162,39 @@ class Synchrotron: """ if hasattr(self, "_preprocessed") and self._preprocessed: return - # + logger.info("{name}: preprocessing ...".format(name=self.name)) self._load_maps() self._add_smallscales() - # + self._preprocessed = True def simulate_frequency(self, frequency): """ - Transform the template map to the requested frequency, - according to the spectral model and using an spectral index map. + Simulate the emission at requested lower frequency by + extrapolating the template map using the power-law model and + the spectral index map. + + Parameters + ---------- + frequency : float + The frequency where to simulate the radio emission. + Unit: [MHz] Returns ------- - skymap_f : 1D `~numpy.ndarray` - The sky map at the input frequency. - filepath : str - The (absolute) path to the output sky map if saved, - otherwise ``None``. + sky : `~SkyBase` + The simulated sky image as a new sky instance. """ - self.preprocess() - # - logger.info("Simulating {name} map at {freq} ({unit}) ...".format( - name=self.name, freq=frequency, unit=self.freq_unit)) - skymap_f = (self.template.data * - (frequency/self.template_freq) ** - (-np.abs(self.indexmap.data))) - # - if self.save: - filepath = self.output(skymap_f, frequency) - else: - filepath = None - return (skymap_f, filepath) + logger.info("Simulating {name} map at {freq:.2f} [MHz] ...".format( + name=self.name, freq=frequency)) + sky = self.sky.copy() + sky.frequency = frequency + ff = frequency / self.template_freq + data = self.template * ff ** (-np.abs(self.indexmap)) + sky.data = data + logger.info("Done simulate map at %.2f [MHz]." % frequency) + return sky def simulate(self, frequencies=None): """ @@ -241,27 +206,20 @@ class Synchrotron: The frequencies where to simulate the foreground map. Unit: [MHz] Default: None (i.e., use ``self.frequencies``) - - Returns - ------- - skymaps : list[1D `~numpy.ndarray`] - List of sky maps at each frequency. - paths : list[str] - List of (absolute) path to the output sky maps. """ - if frequencies is not None: - frequencies = np.array(frequencies, ndmin=1) - else: + if frequencies is None: frequencies = self.frequencies + else: + frequencies = np.array(frequencies, ndmin=1) - skymaps = [] - paths = [] + logger.info("Simulating {name} ...".format(name=self.name)) for freq in frequencies: - skymap_f, outfile = self.simulate_frequency(freq) - skymaps.append(skymap_f) - paths.append(outfile) - return (skymaps, paths) + sky = self.simulate_frequency(freq) + outfile = self._outfilepath(frequency=freq) + sky.write(outfile) + logger.info("Done simulate {name}!".format(name=self.name)) def postprocess(self): """Perform the post-simulation operations before the end.""" - pass + logger.info("{name}: postprocessing ...".format(name=self.name)) + logger.info("^_^ nothing to do :-)") -- cgit v1.2.2