aboutsummaryrefslogtreecommitdiffstats
path: root/fg21sim/galactic/synchrotron.py
blob: b82b63661324edf467cc2671c122b72be405a715 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# Copyright (c) 2016 Weitian LI <liweitianux@live.com>
# MIT license

"""
Diffuse Galactic synchrotron emission (unpolarized) simulations.
"""

import os
import logging
from datetime import datetime, timezone

import numpy as np
from astropy.io import fits
import astropy.units as au
import healpy as hp

from ..utils import read_fits_healpix, write_fits_healpix


logger = logging.getLogger(__name__)


class Synchrotron:
    """
    Simulate the diffuse Galactic synchrotron emission based on an
    existing template.

    Parameters
    ----------
    configs : ConfigManager object
        An `ConfigManager` object contains default and user configurations.
        For more details, see the example config specification.

    Attributes
    ----------
    ???

    References
    ----------
    ???
    """
    # Component name
    name = "Galactic free-free"

    def __init__(self, configs):
        self.configs = configs
        self._set_configs()

    def _set_configs(self):
        """Load the configs and set the corresponding class attributes."""
        comp = "galactic/synchrotron"
        self.template_path = self.configs.get_path(comp+"/template")
        self.template_freq = self.configs.getn(comp+"/template_freq")
        self.template_unit = au.Unit(
            self.configs.getn(comp+"/template_unit"))
        self.indexmap_path = self.configs.get_path(comp+"/indexmap")
        self.smallscales = self.configs.getn(comp+"/add_smallscales")
        self.prefix = self.configs.getn(comp+"/prefix")
        self.save = self.configs.getn(comp+"/save")
        self.output_dir = self.configs.get_path(comp+"/output_dir")
        # output
        self.filename_pattern = self.configs.getn("output/filename_pattern")
        self.use_float = self.configs.getn("output/use_float")
        self.checksum = self.configs.getn("output/checksum")
        self.clobber = self.configs.getn("output/clobber")
        self.nside = self.configs.getn("common/nside")
        self.lmin = self.configs.getn("common/lmin")
        self.lmax = self.configs.getn("common/lmax")
        self.freq_unit = au.Unit(self.configs.getn("frequency/unit"))
        #
        logger.info("Loaded and setup configurations")

    def _load_template(self):
        """Load the template map, and upgrade/downgrade the resolution
        to match the output Nside.
        """
        self.template, self.template_header = read_fits_healpix(
            self.template_path)
        template_nside = self.template_header["NSIDE"]
        logger.info("Loaded template map from {0} (Nside={1})".format(
            self.template_path, template_nside))
        # Upgrade/downgrade resolution
        if template_nside != self.nside:
            self.template = hp.ud_grade(self.template, nside_out=self.nside)
            logger.info("Upgrade/downgrade template map from Nside "
                        "{0} to {1}".format(template_nside, self.nside))

    def _load_indexmap(self):
        """Load the spectral index map, and upgrade/downgrade the resolution
        to match the output Nside.
        """
        self.indexmap, self.indexmap_header = read_fits_healpix(
            self.indexmap_path)
        indexmap_nside = self.indexmap_header["NSIDE"]
        logger.info("Loaded spectral index map from {0} (Nside={1})".format(
            self.indexmap_path, indexmap_nside))
        # Upgrade/downgrade resolution
        if indexmap_nside != self.nside:
            self.indexmap = hp.ud_grade(self.indexmap, nside_out=self.nside)
            logger.info("Upgrade/downgrade spectral index map from Nside "
                        "{0} to {1}".format(indexmap_nside, self.nside))

    def _add_smallscales(self):
        """Add fluctuations on small scales to the template map.

        XXX/TODO:
        * Support using different models.
        * This should be extensible/plug-able, e.g., a separate module
          and allow easily add new models for use.

        References
        ----------
        [1] M. Remazeilles et al. 2015, MNRAS, 451, 4311-4327
            "An improved source-subtracted and destriped 408-MHz all-sky map"
            Sec. 4.2: Small-scale fluctuations
        """
        if (not self.smallscales) or (hasattr(self, "hpmap_smallscales")):
            return
        # To add small scale fluctuations
        # model: Remazeilles15
        gamma = -2.703  # index of the power spectrum between l [30, 90]
        sigma_tp = 56  # original beam resolution of the template [ arcmin ]
        alpha = 0.0599
        beta = 0.782
        # angular power spectrum of the Gaussian random field
        ell = np.arange(self.lmax+1).astype(np.int)
        cl = np.zeros(ell.shape)
        ell_idx = ell >= self.lmin
        cl[ell_idx] = (ell[ell_idx] ** gamma *
                       1.0 - np.exp(-ell[ell_idx]**2 * sigma_tp**2))
        cl[ell < self.lmin] = cl[self.lmin]
        # generate a realization of the Gaussian random field
        gss = hp.synfast(cls=cl, nside=self.nside, new=True)
        # whiten the Gaussian random field
        gss = (gss - gss.mean()) / gss.std()
        self.hpmap_smallscales = alpha * gss * self.template**beta
        self.template += self.hpmap_smallscales
        logger.info("Added small-scale fluctuations")

    def _make_filepath(self, **kwargs):
        """Make the path of output file according to the filename pattern
        and output directory loaded from configurations.
        """
        data = {
            "prefix": self.prefix,
        }
        data.update(kwargs)
        filename = self.filename_pattern.format(**data)
        filetype = self.configs.getn("output/filetype")
        if filetype == "fits":
            filename += ".fits"
        else:
            raise NotImplementedError("unsupported filetype: %s" % filetype)
        filepath = os.path.join(self.output_dir, filename)
        return filepath

    def _make_header(self):
        """Make the header with detail information (e.g., parameters and
        history) for the simulated products.
        """
        header = fits.Header()
        header["COMP"] = ("Galactic synchrotron (unpolarized)",
                          "Emission component")
        header["UNIT"] = ("Kelvin", "Map unit")
        header["CREATOR"] = (__name__, "File creator")
        # TODO:
        history = []
        comments = []
        for hist in history:
            header.add_history(hist)
        for cmt in comments:
            header.add_comment(cmt)
        self.header = header
        logger.info("Created FITS header")

    def output(self, hpmap, frequency):
        """Write the simulated synchrotron map to disk with proper
        header keywords and history.
        """
        if not os.path.exists(self.output_dir):
            os.mkdir(self.output_dir)
            logger.info("Created output dir: {0}".format(self.output_dir))
        #
        filepath = self._make_filepath(frequency=frequency)
        if not hasattr(self, "header"):
            self._make_header()
        header = self.header.copy()
        header["FREQ"] = (frequency, "Frequency [ MHz ]")
        header["DATE"] = (
            datetime.now(timezone.utc).astimezone().isoformat(),
            "File creation date"
        )
        if self.use_float:
            hpmap = hpmap.astype(np.float32)
        write_fits_healpix(filepath, hpmap, header=header,
                           clobber=self.clobber, checksum=self.checksum)
        logger.info("Write simulated map to file: {0}".format(filepath))

    def preprocess(self):
        """Perform the preparation procedures for the final simulations.

        Attributes
        ----------
        _preprocessed : bool
            This attribute presents and is ``True`` after the preparation
            procedures are performed, which indicates that it is ready to
            do the final simulations.
        """
        if hasattr(self, "_preprocessed") and self._preprocessed:
            return
        #
        logger.info("{name}: preprocessing ...".format(name=self.name))
        self._load_template()
        self._load_indexmap()
        self._add_smallscales()
        #
        self._preprocessed = True

    def simulate_frequency(self, frequency):
        """Transform the template map to the requested frequency,
        according to the spectral model and using an spectral index map.
        """
        self.preprocess()
        #
        logger.info("Simulating {name} map at {freq} ({unit}) ...".format(
            name=self.name, freq=frequency, unit=self.freq_unit))
        hpmap_f = (self.template *
                   (frequency / self.template_freq) ** self.indexmap)
        #
        if self.save:
            self.output(hpmap_f, frequency)
        return hpmap_f

    def simulate(self, frequencies):
        """Simulate the synchrotron map at the specified frequencies."""
        hpmaps = []
        for f in np.array(frequencies, ndmin=1):
            hpmap_f = self.simulate_frequency(f)
            hpmaps.append(hpmap_f)
        return hpmaps

    def postprocess(self):
        """Perform the post-simulation operations before the end."""
        pass