From 3258cd772fd975c7cd948b13f30940e893d3b77f Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Sat, 26 Aug 2017 12:13:39 +0800 Subject: freefree.py: Update to use "sky" module; cleanups --- fg21sim/galactic/freefree.py | 163 ++++++++++++++++--------------------------- 1 file changed, 59 insertions(+), 104 deletions(-) (limited to 'fg21sim/galactic/freefree.py') diff --git a/fg21sim/galactic/freefree.py b/fg21sim/galactic/freefree.py index 2db6f84..d3c663f 100644 --- a/fg21sim/galactic/freefree.py +++ b/fg21sim/galactic/freefree.py @@ -28,10 +28,8 @@ References import os import logging -from datetime import datetime, timezone import numpy as np -from astropy.io import fits import astropy.units as au from ..sky import get_sky @@ -51,8 +49,8 @@ class FreeFree: Parameters ---------- - configs : ConfigManager object - An `ConfigManager` object contains default and user configurations. + configs : `~ConfigManager` + An ``ConfigManager`` object contains default and user configurations. For more details, see the example config specification. Attributes @@ -60,15 +58,24 @@ class FreeFree: TODO """ # Component name + comp = "galactic/freefree" name = "Galactic free-free" def __init__(self, configs): self.configs = configs self._set_configs() + self.sky = get_sky(configs) + self.sky.add_header("CompID", self.compID, "Emission component ID") + self.sky.add_header("CompName", self.name, "Emission component") + self.sky.add_header("BUNIT", "K", "[Kelvin] Data unit") + self.sky.creator = __name__ + def _set_configs(self): - """Load the configs and set the corresponding class attributes.""" - comp = "galactic/freefree" + """ + Load the configs and set the corresponding class attributes. + """ + comp = self.compID self.halphamap_path = self.configs.get_path(comp+"/halphamap") self.halphamap_unit = au.Unit( self.configs.getn(comp+"/halphamap_unit")) @@ -79,15 +86,11 @@ class FreeFree: self.halpha_abs_th = self.configs.getn(comp+"/halpha_abs_th") # [mag] self.Te = self.configs.getn(comp+"/electron_temperature") # [K] self.prefix = self.configs.getn(comp+"/prefix") - self.save = self.configs.getn(comp+"/save") self.output_dir = self.configs.get_path(comp+"/output_dir") # self.filename_pattern = self.configs.getn("output/filename_pattern") - self.use_float = self.configs.getn("output/use_float") - self.checksum = self.configs.getn("output/checksum") self.clobber = self.configs.getn("output/clobber") self.frequencies = self.configs.frequencies # [MHz] - self.freq_unit = au.Unit(self.configs.getn("frequency/unit")) # logger.info("Loaded and set up configurations") @@ -95,15 +98,14 @@ class FreeFree: """ Load the Hα map, and 100-μm dust map. """ - sky = get_sky(self.configs) logger.info("Loading H[alpha] map ...") - self.halphamap = sky.open(self.halphamap_path) + self.halphamap = self.sky.open(self.halphamap_path) # Validate input map unit if self.halphamap_unit != au.Unit("Rayleigh"): raise ValueError("unsupported Halpha map unit: {0}".format( self.halphamap_unit)) logger.info("Loading dust map ...") - self.dustmap = sky.open(self.dustmap_path) + self.dustmap = self.sky.open(self.dustmap_path) # Validate input map unit if self.dustmap_unit != au.Unit("MJy / sr"): raise ValueError("unsupported dust map unit: {0}".format( @@ -118,7 +120,7 @@ class FreeFree: """ if hasattr(self, "_dust_corrected") and self._dust_corrected: return - # + logger.info("Correct H[alpha] map for dust absorption") logger.info("Effective dust fraction: {0}".format(self.f_dust)) # Mask the regions where the true Halpha absorption is uncertain. @@ -188,64 +190,26 @@ class FreeFree: h2f = 38.86 * a * nu**(-2.1) * 10**(290/self.Te) * self.Te**0.667 return h2f - def _make_filepath(self, **kwargs): + def _outfilepath(self, frequency, **kwargs): """ - Make the path of output file according to the filename pattern - and output directory loaded from configurations. - """ - data = { - "prefix": self.prefix, - } - data.update(kwargs) - filename = self.filename_pattern.format(**data) - filepath = os.path.join(self.output_dir, filename) - return filepath + Generate the path/filename to the output file for writing + the simulate sky images. - def _make_header(self): - """ - Make the header with detail information (e.g., parameters and - history) for the simulated products. - """ - header = fits.Header() - header["COMP"] = (self.name, "Emission component") - header["BUNIT"] = ("K", "data unit is Kelvin") - header["CREATOR"] = (__name__, "File creator") - # TODO: - history = [] - comments = [] - for hist in history: - header.add_history(hist) - for cmt in comments: - header.add_comment(cmt) - self.header = header - logger.info("Created FITS header") - - def output(self, skymap, frequency): - """ - Write the simulated free-free map to disk with proper header - keywords and history. + Parameters + ---------- + frequency : float + The frequency of the output sky image. + Unit: [MHz] Returns ------- - outfile : str - The (absolute) path to the output sky map file. + filepath : str + The generated filepath for the output sky file. """ - outfile = self._make_filepath(frequency=frequency) - if not hasattr(self, "header"): - self._make_header() - header = self.header.copy() - header["FREQ"] = (frequency, "Frequency [ MHz ]") - header["DATE"] = ( - datetime.now(timezone.utc).astimezone().isoformat(), - "File creation date" - ) - if self.use_float: - skymap = skymap.astype(np.float32) - sky = get_sky(configs=self.configs) - sky.data = skymap - sky.header = header - sky.write(outfile, clobber=self.clobber, checksum=self.checksum) - return outfile + filename = self.filename_pattern.format( + prefix=self.prefix, frequency=frequency, **kwargs) + filepath = os.path.join(self.output_dir, filename) + return filepath def preprocess(self): """ @@ -260,70 +224,61 @@ class FreeFree: """ if hasattr(self, "_preprocessed") and self._preprocessed: return - # + logger.info("{name}: preprocessing ...".format(name=self.name)) self._load_maps() - # Correct for dust absorption self._correct_dust_absorption() - # + self._preprocessed = True def simulate_frequency(self, frequency): """ Simulate the free-free map at the specified frequency. + Parameters + ---------- + frequency : float + The frequency where to simulate the emission map. + Unit: [MHz] + Returns ------- - skymap_f : 1D `~numpy.ndarray` - The sky map at the input frequency. - filepath : str - The (absolute) path to the output sky map if saved, - otherwise ``None``. + sky : `~SkyBase` + The simulated sky image as a new sky instance. """ - self.preprocess() - # - logger.info("Simulating {name} map at {freq} ({unit}) ...".format( - name=self.name, freq=frequency, unit=self.freq_unit)) + logger.info("Simulating {name} map at {freq:.2f} [MHz] ...".format( + name=self.name, freq=frequency)) + sky = self.sky.copy() + sky.frequency = frequency ratio_K_R = self._calc_halpha_to_freefree(frequency) - skymap_f = self.halphamap.data * ratio_K_R - # - if self.save: - filepath = self.output(skymap_f, frequency) - else: - filepath = None - return (skymap_f, filepath) + sky.data = self.halphamap * ratio_K_R + logger.info("Done simulate map at %.2f [MHz]." % frequency) + return sky def simulate(self, frequencies=None): """ - Simulate the synchrotron map at the specified frequencies. + Simulate the emission maps. Parameters ---------- frequencies : float, or list[float] - The frequencies where to simulate the foreground map. + The frequencies where to simulate the emission map. Unit: [MHz] Default: None (i.e., use ``self.frequencies``) - - Returns - ------- - skymaps : list[1D `~numpy.ndarray`] - List of sky maps at each frequency. - paths : list[str] - List of (absolute) path to the output sky maps. """ - if frequencies is not None: - frequencies = np.array(frequencies, ndmin=1) - else: + if frequencies is None: frequencies = self.frequencies + else: + frequencies = np.array(frequencies, ndmin=1) - skymaps = [] - paths = [] + logger.info("Simulating {name} ...".format(name=self.name)) for freq in frequencies: - skymap_f, outfile = self.simulate_frequency(freq) - skymaps.append(skymap_f) - paths.append(outfile) - return (skymaps, paths) + sky = self.simulate_frequency(freq) + outfile = self._outfilepath(frequency=freq) + sky.write(outfile) + logger.info("Done simulate {name}!".format(name=self.name)) def postprocess(self): """Perform the post-simulation operations before the end.""" - pass + logger.info("{name}: postprocessing ...".format(name=self.name)) + logger.info("^_^ nothing to do :-)") -- cgit v1.2.2