1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
#!/usr/bin/env python3
#
# Copyright (c) 2017 Weitian LI <weitian@aaronly.me>
# MIT License
#
"""
Get the slices at the specified redshifts/frequencies from the HI
light-cone cube (created by `make_lightcone.py`), and use linear
interpolation.
"""
import sys
import argparse
import logging
from datetime import datetime, timezone
import numpy as np
from astropy.io import fits
from astropy.cosmology import FlatLambdaCDM
from z2freq import z2freq, freq2z
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
cosmo = FlatLambdaCDM(H0=71, Om0=0.27)
class LightCone:
"""
Light-cone cube mimic the observation of HI signal.
"""
def __init__(self, infile):
with fits.open(infile) as f:
self.data = f[0].data
self.header = f[0].header
logger.info("Loaded light-cone cube: %dx%d (cells) * %d (slices)" %
(self.Nside, self.Nside, self.Nslice))
@property
def Nslice(self):
ns, __, __ = self.data.shape
return ns
@property
def Nside(self):
return self.header["Nside"]
@property
def slices_Dc(self):
"""
The comoving distances of each slice in the light-cone cube.
The slices are evenly distributed along the LoS with equal
comoving step. [Mpc]
"""
Dc_step = self.header["Dc_step"]
Dc_min = self.header["Dc_min"]
Dc = np.array([Dc_min + Dc_step*i for i in range(self.Nslice)])
return Dc
def get_slice(self, z):
Dc = cosmo.comoving_distance(z).value # [Mpc]
slices_Dc = self.slices_Dc
if Dc < slices_Dc.min() or Dc > slices_Dc.max():
raise ValueError("requested redshift out of range: %.2f" % z)
i2 = (slices_Dc <= Dc).sum()
i1 = i2 - 1
Dc1, s1 = slices_Dc[i1], self.data[i1, :, :]
Dc2, s2 = slices_Dc[i2], self.data[i2, :, :]
slope = (s2 - s1) / (Dc2 - Dc1)
return s1 + slope * (Dc - Dc1)
def write_slice(self, outfile, data, z, clobber=False):
freq = z2freq(z)
Dc = cosmo.comoving_distance(z).value # [Mpc]
header = fits.Header()
header["BUNIT"] = (self.header["BUNIT"],
self.header.comments["BUNIT"])
header["Lside"] = (self.header["Lside"],
self.header.comments["Lside"])
header["Nside"] = (self.header["Nside"],
self.header.comments["Lside"])
header["REDSHIFT"] = (z, "redshift of this slice")
header["FREQ"] = (freq, "[MHz] observed HI signal frequency")
header["Dc"] = (Dc, "[cMpc] comoving distance")
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
hdu = fits.PrimaryHDU(data=data, header=header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote slice to file: %s" % outfile)
def main():
outfile_pattern = "{prefix}_f{freq:06.2f}_z{z:06.3f}.fits"
outfile_prefix = "deltaTb"
parser = argparse.ArgumentParser(
description="Get slices at requested redshifts/frequencies " +
"from light-cone cube")
parser.add_argument("-C", "--clobber", dest="clobber",
action="store_true",
help="overwrite existing files")
parser.add_argument("-i", "--infile", dest="infile", required=True,
help="input light-cone cube")
parser.add_argument("-o", "--outfile", dest="outfile",
default=outfile_pattern,
help="output image slice filename pattern FITS " +
"(default: %s)" % outfile_pattern)
parser.add_argument("-p", "--prefix", dest="prefix",
default=outfile_prefix,
help="prefix of output slices (default: %s)" %
outfile_prefix)
exgrp = parser.add_mutually_exclusive_group(required=True)
exgrp.add_argument("-z", "--redshifts", dest="redshifts", nargs="+",
help="redshifts where to interpolate slices")
exgrp.add_argument("-f", "--freqs", dest="freqs", nargs="+",
help="21cm frequencies [MHz] to interpolate slices")
args = parser.parse_args()
if args.redshifts:
redshifts = [float(z) for z in args.redshifts]
freqs = z2freq(redshifts, print_=False)
else:
freqs = [float(f) for f in args.freqs]
redshifts = freq2z(freqs, print_=False)
lightcone = LightCone(args.infile)
for z, f in zip(redshifts, freqs):
outfile = args.outfile.format(prefix=args.prefix, z=z, freq=f)
logger.info("z=%06.3f, freq=%06.2f MHz : %s ..." % (z, f, outfile))
data = lightcone.get_slice(z)
lightcone.write_slice(outfile, data=data, z=z, clobber=args.clobber)
if __name__ == "__main__":
main()
|