diff options
| -rw-r--r-- | fg21sim/galactic/freefree.py | 163 | 
1 files changed, 59 insertions, 104 deletions
| diff --git a/fg21sim/galactic/freefree.py b/fg21sim/galactic/freefree.py index 2db6f84..d3c663f 100644 --- a/fg21sim/galactic/freefree.py +++ b/fg21sim/galactic/freefree.py @@ -28,10 +28,8 @@ References  import os  import logging -from datetime import datetime, timezone  import numpy as np -from astropy.io import fits  import astropy.units as au  from ..sky import get_sky @@ -51,8 +49,8 @@ class FreeFree:      Parameters      ---------- -    configs : ConfigManager object -        An `ConfigManager` object contains default and user configurations. +    configs : `~ConfigManager` +        An ``ConfigManager`` object contains default and user configurations.          For more details, see the example config specification.      Attributes @@ -60,15 +58,24 @@ class FreeFree:      TODO      """      # Component name +    comp = "galactic/freefree"      name = "Galactic free-free"      def __init__(self, configs):          self.configs = configs          self._set_configs() +        self.sky = get_sky(configs) +        self.sky.add_header("CompID", self.compID, "Emission component ID") +        self.sky.add_header("CompName", self.name, "Emission component") +        self.sky.add_header("BUNIT", "K", "[Kelvin] Data unit") +        self.sky.creator = __name__ +      def _set_configs(self): -        """Load the configs and set the corresponding class attributes.""" -        comp = "galactic/freefree" +        """ +        Load the configs and set the corresponding class attributes. +        """ +        comp = self.compID          self.halphamap_path = self.configs.get_path(comp+"/halphamap")          self.halphamap_unit = au.Unit(              self.configs.getn(comp+"/halphamap_unit")) @@ -79,15 +86,11 @@ class FreeFree:          self.halpha_abs_th = self.configs.getn(comp+"/halpha_abs_th")  # [mag]          self.Te = self.configs.getn(comp+"/electron_temperature")  # [K]          self.prefix = self.configs.getn(comp+"/prefix") -        self.save = self.configs.getn(comp+"/save")          self.output_dir = self.configs.get_path(comp+"/output_dir")          #          self.filename_pattern = self.configs.getn("output/filename_pattern") -        self.use_float = self.configs.getn("output/use_float") -        self.checksum = self.configs.getn("output/checksum")          self.clobber = self.configs.getn("output/clobber")          self.frequencies = self.configs.frequencies  # [MHz] -        self.freq_unit = au.Unit(self.configs.getn("frequency/unit"))          #          logger.info("Loaded and set up configurations") @@ -95,15 +98,14 @@ class FreeFree:          """          Load the Hα map, and 100-μm dust map.          """ -        sky = get_sky(self.configs)          logger.info("Loading H[alpha] map ...") -        self.halphamap = sky.open(self.halphamap_path) +        self.halphamap = self.sky.open(self.halphamap_path)          # Validate input map unit          if self.halphamap_unit != au.Unit("Rayleigh"):              raise ValueError("unsupported Halpha map unit: {0}".format(                  self.halphamap_unit))          logger.info("Loading dust map ...") -        self.dustmap = sky.open(self.dustmap_path) +        self.dustmap = self.sky.open(self.dustmap_path)          # Validate input map unit          if self.dustmap_unit != au.Unit("MJy / sr"):              raise ValueError("unsupported dust map unit: {0}".format( @@ -118,7 +120,7 @@ class FreeFree:          """          if hasattr(self, "_dust_corrected") and self._dust_corrected:              return -        # +          logger.info("Correct H[alpha] map for dust absorption")          logger.info("Effective dust fraction: {0}".format(self.f_dust))          # Mask the regions where the true Halpha absorption is uncertain. @@ -188,64 +190,26 @@ class FreeFree:          h2f = 38.86 * a * nu**(-2.1) * 10**(290/self.Te) * self.Te**0.667          return h2f -    def _make_filepath(self, **kwargs): +    def _outfilepath(self, frequency, **kwargs):          """ -        Make the path of output file according to the filename pattern -        and output directory loaded from configurations. -        """ -        data = { -            "prefix": self.prefix, -        } -        data.update(kwargs) -        filename = self.filename_pattern.format(**data) -        filepath = os.path.join(self.output_dir, filename) -        return filepath +        Generate the path/filename to the output file for writing +        the simulate sky images. -    def _make_header(self): -        """ -        Make the header with detail information (e.g., parameters and -        history) for the simulated products. -        """ -        header = fits.Header() -        header["COMP"] = (self.name, "Emission component") -        header["BUNIT"] = ("K", "data unit is Kelvin") -        header["CREATOR"] = (__name__, "File creator") -        # TODO: -        history = [] -        comments = [] -        for hist in history: -            header.add_history(hist) -        for cmt in comments: -            header.add_comment(cmt) -        self.header = header -        logger.info("Created FITS header") - -    def output(self, skymap, frequency): -        """ -        Write the simulated free-free map to disk with proper header -        keywords and history. +        Parameters +        ---------- +        frequency : float +            The frequency of the output sky image. +            Unit: [MHz]          Returns          ------- -        outfile : str -            The (absolute) path to the output sky map file. +        filepath : str +            The generated filepath for the output sky file.          """ -        outfile = self._make_filepath(frequency=frequency) -        if not hasattr(self, "header"): -            self._make_header() -        header = self.header.copy() -        header["FREQ"] = (frequency, "Frequency [ MHz ]") -        header["DATE"] = ( -            datetime.now(timezone.utc).astimezone().isoformat(), -            "File creation date" -        ) -        if self.use_float: -            skymap = skymap.astype(np.float32) -        sky = get_sky(configs=self.configs) -        sky.data = skymap -        sky.header = header -        sky.write(outfile, clobber=self.clobber, checksum=self.checksum) -        return outfile +        filename = self.filename_pattern.format( +            prefix=self.prefix, frequency=frequency, **kwargs) +        filepath = os.path.join(self.output_dir, filename) +        return filepath      def preprocess(self):          """ @@ -260,70 +224,61 @@ class FreeFree:          """          if hasattr(self, "_preprocessed") and self._preprocessed:              return -        # +          logger.info("{name}: preprocessing ...".format(name=self.name))          self._load_maps() -        # Correct for dust absorption          self._correct_dust_absorption() -        # +          self._preprocessed = True      def simulate_frequency(self, frequency):          """          Simulate the free-free map at the specified frequency. +        Parameters +        ---------- +        frequency : float +            The frequency where to simulate the emission map. +            Unit: [MHz] +          Returns          ------- -        skymap_f : 1D `~numpy.ndarray` -            The sky map at the input frequency. -        filepath : str -            The (absolute) path to the output sky map if saved, -            otherwise ``None``. +        sky : `~SkyBase` +            The simulated sky image as a new sky instance.          """ -        self.preprocess() -        # -        logger.info("Simulating {name} map at {freq} ({unit}) ...".format( -            name=self.name, freq=frequency, unit=self.freq_unit)) +        logger.info("Simulating {name} map at {freq:.2f} [MHz] ...".format( +            name=self.name, freq=frequency)) +        sky = self.sky.copy() +        sky.frequency = frequency          ratio_K_R = self._calc_halpha_to_freefree(frequency) -        skymap_f = self.halphamap.data * ratio_K_R -        # -        if self.save: -            filepath = self.output(skymap_f, frequency) -        else: -            filepath = None -        return (skymap_f, filepath) +        sky.data = self.halphamap * ratio_K_R +        logger.info("Done simulate map at %.2f [MHz]." % frequency) +        return sky      def simulate(self, frequencies=None):          """ -        Simulate the synchrotron map at the specified frequencies. +        Simulate the emission maps.          Parameters          ----------          frequencies : float, or list[float] -            The frequencies where to simulate the foreground map. +            The frequencies where to simulate the emission map.              Unit: [MHz]              Default: None (i.e., use ``self.frequencies``) - -        Returns -        ------- -        skymaps : list[1D `~numpy.ndarray`] -            List of sky maps at each frequency. -        paths : list[str] -            List of (absolute) path to the output sky maps.          """ -        if frequencies is not None: -            frequencies = np.array(frequencies, ndmin=1) -        else: +        if frequencies is None:              frequencies = self.frequencies +        else: +            frequencies = np.array(frequencies, ndmin=1) -        skymaps = [] -        paths = [] +        logger.info("Simulating {name} ...".format(name=self.name))          for freq in frequencies: -            skymap_f, outfile = self.simulate_frequency(freq) -            skymaps.append(skymap_f) -            paths.append(outfile) -        return (skymaps, paths) +            sky = self.simulate_frequency(freq) +            outfile = self._outfilepath(frequency=freq) +            sky.write(outfile) +        logger.info("Done simulate {name}!".format(name=self.name))      def postprocess(self):          """Perform the post-simulation operations before the end.""" -        pass +        logger.info("{name}: postprocessing ...".format(name=self.name)) +        logger.info("^_^ nothing to do :-)") | 
