1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
#!/usr/bin/env python3
#
# Aaron LI
# Created: 2016-06-30
# Updated: 2016-07-11
#
# Change logs:
# 2016-07-11:
# * Use a default config to allow a minimal user config
# 2016-07-04:
# * Fix a bug with wrong variable
# * Update radii to unit "kpc" and mass to unit "Msun"
#
"""
Calculate the overdensity profile, and from which to calculate the
R_{500} (defined as the radius of the sphere that encloses a mean total
mass density of 500 times the critical density at the cluster's redshift)
and M_gas_{500}/M_{500} (the enclosed gas/total mass by a sphere of radius
R_{500}).
References:
[1] Ettori et al., 2013, Space Science Review, 177, 119-154
"""
import argparse
import json
from collections import OrderedDict
import numpy as np
import scipy.optimize as optimize
import astropy.units as au
from astropy.cosmology import FlatLambdaCDM
from configobj import ConfigObj
import rpy2.robjects as ro
from rpy2.robjects.packages import importr
from astro_params import AstroParams
config_default = """
## Configuration for `calc_overdensity.py`
# redshift of the source (critical density)
#redshift = <REDSHIFT>
# gas mass profile
m_gas_profile = mass_gas_profile.txt
# output total (gravitational) mass profile
m_total_profile = mass_total_profile.txt
# number of times w.r.t the critical density
delta = 1500, 500, 200
# output results in JSON format
outfile = overdensity.json
# output overdensity profile
overdensity_profile = overdensity_profile.txt
"""
class MassProfile:
"""
Cluster's gas/total integrated mass profile.
The total/gravitational mass profile is required to calculate
the overdensity profile, from which the R_{delta} is then determined.
"""
# supported types of mass profile
MASS_TYPES = ["total", "gas"]
# available splines
SPLINES = ["mass", "overdensity"]
# input mass data: [r, r_err, m]
r = None
r_err = None
m = None
# redshift of the object
redshift = None
# fitted smoothing spline
m_spline = None
m_spline_log10 = None
od_spline = None
od_spline_log10 = None
# call R through `rpy2`
mgcv = importr("mgcv")
def __init__(self, mass, mass_type="total"):
self.load_data(data=mass, mass_type=mass_type)
def load_data(self, data, mass_type="total"):
if mass_type not in self.MASS_TYPES:
raise ValueError("invalid mass_types: %s" % mass_type)
# 3-column mass profile: r[kpc], r_err[kpc], mass[Msun]
self.r = data[:, 0].copy()
self.r_err = data[:, 1].copy()
self.m = data[:, 2].copy()
self.mass_type = mass_type
def calc_overdensity(self, z, verbose=True):
"""
Calculate the overdensity profile from the total/gravitational
mass profile.
The overdensity is the ratio of the enclosed mean total mass
density to the critical density at the source's redshift.
"""
if self.mass_type != "total":
raise ValueError("total mass profile is required")
#
if verbose:
print("Calculating the overdensity profile ...")
overdensity = np.zeros(self.r.shape)
# critical density
cosmo = FlatLambdaCDM(H0=AstroParams.H0, Om0=AstroParams.OmegaM0)
d_crit = cosmo.critical_density(z).cgs.value # [ g/cm^3 ]
for i, r in enumerate(self.r):
volume = (4.0/3.0) * np.pi * (r*au.kpc.to(au.cm))**3
mass = self.m[i] * au.solMass.to(au.g)
overdensity[i] = mass / volume / d_crit
self.overdensity = overdensity
return overdensity
def calc_radius_delta(self, delta):
"""
Calculate the radius at which the overdensity is delta.
"""
if self.od_spline is None:
self.fit_spline(spline="overdensity", log10=True)
if min(self.overdensity) > delta:
raise ValueError("min(overdensity) > %d" % delta)
r = optimize.newton(
lambda x: self.eval_spline("overdensity", x) - delta,
x0=500.0, tol=1e-2)
return r
def calc_mass_delta(self, r_delta):
if self.m_spline is None:
self.fit_spline(spline="mass", log10=True)
return self.eval_spline(spline="mass", x=r_delta)
def save(self, outfile):
"""
Save calculated overdensity profile.
"""
data = np.column_stack([self.r,
self.r_err,
self.overdensity])
header = "radius[kpc] radius_err[kpc] overdensity"
np.savetxt(outfile, data, header=header)
def fit_spline(self, spline, log10):
"""
Fit a smoothing line to the specified spline data,
by utilizing the R `mgcv::gam()` function.
If 'log10' is True, the input data are first applied the log-scale
transform, and then fitted by the smoothing spline.
The fitted spline allows extrapolation.
"""
if spline not in self.SPLINES:
raise ValueError("invalid spline: %s" % spline)
#
if spline == "mass":
# input gas/total mass profile
if log10:
x = ro.FloatVector(np.log10(self.r))
y = ro.FloatVector(np.log10(self.m))
self.m_spline_log10 = True
else:
x = ro.FloatVector(self.r)
y = ro.FloatVector(self.m)
self.m_spline_log10 = False
self.m_spline = self.mgcv.gam(
ro.Formula("y ~ s(x, bs='ps')"),
data=ro.DataFrame({"x": x, "y": y}),
method="REML")
elif spline == "overdensity":
# calculated overdensity profile
if log10:
x = ro.FloatVector(np.log10(self.r))
y = ro.FloatVector(np.log10(self.overdensity))
self.od_spline_log10 = True
else:
x = ro.FloatVector(self.radius)
y = ro.FloatVector(self.rho_total)
self.od_spline_log10 = False
self.od_spline = self.mgcv.gam(
ro.Formula("y ~ s(x, bs='ps')"),
data=ro.DataFrame({"x": x, "y": y}),
method="REML")
else:
raise ValueError("invalid spline: %s" % spline)
def eval_spline(self, spline, x):
"""
Evaluate the specified spline at the supplied positions.
Also check whether the spline was fitted in the log-scale space,
and transform the evaluated values if necessary.
"""
x = np.array(x)
if x.shape == ():
x = x.reshape((1,))
if spline == "mass":
spl = self.m_spline
log10 = self.m_spline_log10
elif spline == "overdensity":
spl = self.od_spline
log10 = self.od_spline_log10
else:
raise ValueError("invalid spline: %s" % spline)
#
if log10:
x_new = ro.ListVector({"x": ro.FloatVector(np.log10(x))})
y_pred = self.mgcv.predict_gam(spl, newdata=x_new)
y_pred = 10 ** np.array(y_pred)
else:
x_new = ro.ListVector({"x": ro.FloatVector(x)})
y_pred = self.mgcv.predict_gam(spl, newdata=x_new)
y_pred = np.array(y_pred)
#
if len(y_pred) == 1:
return y_pred[0]
else:
return y_pred
def main():
parser = argparse.ArgumentParser(
description="Calculate overdensity profile and R_{500} etc.")
parser.add_argument("config", nargs="?", default="overdensity.conf",
help="config for overdensity profile and R_{500} " +
"etc. calculation (default: overdensity.conf)")
args = parser.parse_args()
config = ConfigObj(config_default.splitlines())
config_user = ConfigObj(args.config)
config.merge(config_user)
redshift = config.as_float("redshift")
m_gas_data = np.loadtxt(config["m_gas_profile"])
m_total_data = np.loadtxt(config["m_total_profile"])
delta = list(map(int, config.as_list("delta")))
m_total_profile = MassProfile(mass=m_total_data, mass_type="total")
m_total_profile.calc_overdensity(z=redshift, verbose=True)
m_total_profile.save(outfile=config["overdensity_profile"])
m_gas_profile = MassProfile(mass=m_gas_data, mass_type="gas")
results = OrderedDict()
results["redshift"] = redshift
for d in delta:
r_delta = m_total_profile.calc_radius_delta(delta=d)
m_total_delta = m_total_profile.calc_mass_delta(r_delta)
m_gas_delta = m_gas_profile.calc_mass_delta(r_delta)
results["R%d[kpc]" % d] = r_delta
results["Mtotal%d[Msun]" % d] = m_total_delta
results["Mgas%d[Msun]" % d] = m_gas_delta
results_json = json.dumps(results, indent=2)
print(results_json)
open(config["outfile"], "w").write(results_json+"\n")
if __name__ == "__main__":
main()
|