1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
|
#!/usr/bin/env python3
#
# Copyright (c) 2017 Weitian LI <weitian@aaronly.me>
# MIT License
#
"""
Convert the 21cmFAST simulated and generated lighttravel cubes into the
FITS format with proper headers that are consistent with the lightcone
created by ``make_lightcone.py``, and therefore ``get_slice_zfreq.py``
can be used to extract needed slices.
The 21cmFAST lighttravel cube is created by the tool
``redshift_interpolate_boxes.c`` shipped with 21cmFAST.
For example, Mesinger et al. (2016) released their simulations at:
http://homepage.sns.it/mesinger/EOS.html
where the *faint galaxies* model is recommended by the authors.
The lighttravel cubes have filenames:
``delta_T_v3_no_halos__zstart*_zend*_FLIPBOXES0_1024_1600Mpc_lighttravel``
data type: float32, little endidian, C ordering
cube dimension: 1024x1024 (XY-spatial), 1024 (LoS/z)
length: 1600 comoving Mpc
unit: mK
.. 21cmFAST: https://github.com/andreimesinger/21cmFAST
.. Mesinger et al. 2016, MNRAS, 459, 2342
"""
import os
import sys
import argparse
import logging
from datetime import datetime, timezone
import numpy as np
from astropy.io import fits
from astropy.wcs import WCS
from astropy.cosmology import FlatLambdaCDM
import astropy.units as au
logging.basicConfig(level=logging.INFO,
format="[%(levelname)s:%(lineno)d] %(message)s")
logger = logging.getLogger()
class LightCone:
"""
Light-cone cube mimic the observation of HI signal.
cube shape:
* [X, Y, LoS] (axes_swapped = False)
* [LoS, Y, Y] (axes_swapped = True)
"""
def __init__(self, cube, zmin, zmax, unit, Lside, cosmo,
axes_swapped=False):
self.cube = cube
self.zmin = zmin
self.zmax = zmax
self.unit = unit
self.Lside = Lside # [cMpc] simulation side length
self.cosmo = cosmo
self.Dc_min = cosmo.comoving_distance(zmin).value # [cMpc]
self.Dc_max = cosmo.comoving_distance(zmax).value # [cMpc]
self.axes_swapped = axes_swapped
def swap_axes(self):
self.cube = np.swapaxes(self.cube, 0, 2)
self.axes_swapped = not self.axes_swapped
logger.info("Axes swapped: %s" % self.axes_swapped)
if self.axes_swapped:
logger.info("Cube axes: [LoS, Y, X]")
else:
logger.info("Cube axes: [X, Y, LoS]")
@property
def Nside(self):
return self.cube.shape[1]
@property
def Nslice(self):
if self.axes_swapped:
return self.cube.shape[0]
else:
return self.cube.shape[2]
@property
def Dc_cell(self):
return self.Lside / self.Nslice
@property
def wcs(self):
w = WCS(naxis=3)
w.wcs.ctype = ["pixel", "pixel", "pixel"]
w.wcs.cunit = ["Mpc", "Mpc", "Mpc"] # comoving
w.wcs.crpix = np.array([1.0, 1.0, 1.0])
w.wcs.crval = np.array([0.0, 0.0, self.Dc_min])
w.wcs.cdelt = np.array([self.Dc_cell, self.Dc_cell, self.Dc_cell])
return w
@property
def header(self):
dDc = self.Dc_cell
header = fits.Header()
header["BUNIT"] = (str(self.unit), "Data unit")
header["zmin"] = (self.zmin, "HI simulation minimum redshift")
header["zmax"] = (self.zmax, "HI simulation maximum redshift")
header["Dc_min"] = (self.Dc_min, "[cMpc] comoving distance at zmin")
header["Dc_max"] = (self.Dc_max, "[cMpc] comoving distance at zmax")
header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
header["Lside"] = (self.Lside, "[cMpc] Simulation side length")
header["Nside"] = (self.Nside, "Number of cells at each side")
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
header.extend(self.wcs.to_header(), update=True)
return header
def write(self, outfile, clobber=False):
hdu = fits.PrimaryHDU(data=self.cube, header=self.header)
logger.info("Created FITS object, writing to disk ...")
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote light-cone cube to: %s" % outfile)
def main():
parser = argparse.ArgumentParser(
description="convert 21cmFAST lighttravel cube to FITS lightcone cube")
parser.add_argument("-C", "--clobber", dest="clobber",
action="store_true", help="overwrite existing file")
parser.add_argument("--data-type", dest="data_type", default="float32",
help="input cube data type (default: float32)")
parser.add_argument("--unit", dest="unit", default="mK",
help="input cube data unit (default: mK)")
parser.add_argument("--unit-out", dest="unit_out", default="K",
help="output data unit (default: K)")
parser.add_argument("--side-length", dest="side_length",
type=float, default=1600.0,
help="input cube simulation side length [cMpc]")
parser.add_argument("--dimension", dest="dimension",
nargs=3, type=int, default=[1024, 1024, 1024],
help="input cube dimension (C ordering assumed)")
parser.add_argument("--H0", dest="H0", type=float, default=67.8,
help="simulation adopted H0 (default: 67.8)")
parser.add_argument("--omega-m0", dest="Om0", type=float, default=0.308,
help="simulation adopted OmegaM0 (default: 0.308)")
parser.add_argument("--omega-b0", dest="Ob0", type=float, default=0.0484,
help="simulation adopted Omegab0 (default: 0.0484)")
parser.add_argument("-z", "--z-min", dest="zmin", type=float,
required=True, help="minimum/beginning redshift")
parser.add_argument("-Z", "--z-max", dest="zmax", type=float,
required=True, help="maximum/end redshift")
parser.add_argument("infile", help="input 21cmFAST lighttravel cube")
parser.add_argument("outfile", help="output FITS lightcone cube")
args = parser.parse_args()
if os.path.exists(args.outfile):
if args.clobber:
os.remove(args.outfile)
logger.warning("Removed existing output file: %s" % args.outfile)
else:
raise OSError("output file already exists: %s" % args.outfile)
cosmo = FlatLambdaCDM(H0=args.H0, Om0=args.Om0, Ob0=args.Ob0)
logger.info("Cosmology: {0}".format(cosmo))
unit_in = au.Unit(args.unit)
unit_out = au.Unit(args.unit_out)
logger.info("Unit: %s (input) -> %s (output)" % (unit_in, unit_out))
cube_in = np.fromfile(args.infile, dtype=np.dtype(args.data_type))
cube_in = cube_in.reshape(args.dimension)
logger.info("Loaded lighttravel cube from file: %s" % args.infile)
logger.info("Data type: {0}; dimension: {1}".format(
cube_in.dtype, cube_in.shape))
cube_in *= unit_in.to(unit_out)
logger.info("Converted unit to: %s" % unit_out)
lightcone = LightCone(cube_in, zmin=args.zmin, zmax=args.zmax,
unit=unit_out, Lside=args.side_length,
cosmo=cosmo, axes_swapped=False)
lightcone.swap_axes()
lightcone.write(args.outfile)
if __name__ == "__main__":
main()
|