From 10f0d7cfdfc55c1a3a7c1e9664c38efbd776b895 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Fri, 1 Jul 2016 12:10:15 +0800 Subject: Add 'calc_overdensity.py' * Calculate the overdensity profile * Determine the radii: R_{delta} * Calculate the total/gas mass within R_{delta}: Mtotal_{delta}, Mgas_{delta} --- calc_overdensity.py | 264 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 264 insertions(+) create mode 100755 calc_overdensity.py diff --git a/calc_overdensity.py b/calc_overdensity.py new file mode 100755 index 0000000..f7b4d1d --- /dev/null +++ b/calc_overdensity.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +# +# Aaron LI +# Created: 2016-06-30 +# Updated: 2016-06-30 +# + +""" +Calculate the overdensity profile, and from which to calculate the +R_{500} (defined as the radius of the sphere that encloses a mean total +mass density of 500 times the critical density at the cluster's redshift) +and M_gas_{500}/M_{500} (the enclosed gas/total mass by a sphere of radius +R_{500}). + + +References: +[1] Ettori et al., 2013, Space Science Review, 177, 119-154 + + +Sample configuration file: +------------------------------------------------------------ +## Configuration for `calc_overdensity.py` +## Date: 2016-06-30 + +# redshift of the source (critical density) +redshift = + +# gas mass profile +m_gas_profile = mass_gas_profile.txt + +# output total (gravitational) mass profile +m_total_profile = mass_total_profile.txt + +# number of times w.r.t the critical density +delta = 500 + +# output results in JSON format +outfile = overdensity.json + +# output overdensity profile +overdensity_profile = overdensity_profile.txt +------------------------------------------------------------ +""" + +import argparse +import json +from collections import OrderedDict + +import numpy as np +import scipy.optimize as optimize +import astropy.units as au +from astropy.cosmology import FlatLambdaCDM +from configobj import ConfigObj + +import rpy2.robjects as ro +from rpy2.robjects.packages import importr + +from astro_params import AstroParams + + +class MassProfile: + """ + Cluster's gas/total integrated mass profile. + + The total/gravitational mass profile is required to calculate + the overdensity profile, from which the R_{delta} is then determined. + """ + # supported types of mass profile + MASS_TYPES = ["total", "gas"] + # available splines + SPLINES = ["mass", "overdensity"] + # input mass data: [r, r_err, m] + r = None + r_err = None + m = None + # redshift of the object + redshift = None + # fitted smoothing spline + m_spline = None + m_spline_log10 = None + od_spline = None + od_spline_log10 = None + # call R through `rpy2` + mgcv = importr("mgcv") + + def __init__(self, mass, mass_type="total"): + self.load_data(data=mass, mass_type=mass_type) + + def load_data(self, data, mass_type="total"): + if mass_type not in self.MASS_TYPES: + raise ValueError("invalid mass_types: %s" % mass_type) + # 3-column mass profile: [r, r_err, mass] + self.r = data[:, 0].copy() + self.r_err = data[:, 1].copy() + self.m = data[:, 2].copy() + self.mass_type = mass_type + + def calc_overdensity(self, z, verbose=True): + """ + Calculate the overdensity profile from the total/gravitational + mass profile. + + The overdensity is the ratio of the enclosed mean total mass + density to the critical density at the source's redshift. + """ + if self.mass_type != "total": + raise ValueError("total mass profile is required") + # + if verbose: + print("Calculating the overdensity profile ...") + overdensity = np.zeros(self.r.shape) + # critical density + cosmo = FlatLambdaCDM(H0=AstroParams.H0, Om0=AstroParams.OmegaM0) + d_crit = cosmo.critical_density(z).cgs.value # [ g/cm^3 ] + for i, r in enumerate(self.r): + volume = (4.0/3.0) * np.pi * r**3 + overdensity[i] = self.m[i] / volume / d_crit + self.overdensity = overdensity + return overdensity + + def calc_radius_delta(self, delta): + """ + Calculate the radius at which the overdensity is delta. + """ + if self.od_spline is None: + self.fit_spline(spline="overdensity", log10=True) + if min(self.overdensity) > delta: + raise ValueError("min(overdensity) > %d" % delta) + r = optimize.newton( + lambda x: self.eval_spline("overdensity", x) - delta, + x0=500.0*au.kpc.to(au.cm), + tol=1e-2*au.kpc.to(au.cm)) + return r + + def calc_mass_delta(self, r_delta): + if self.m_spline is None: + self.fit_spline(spline="mass", log10=True) + return self.eval_spline(spline="mass", x=r_delta) + + def save(self, outfile): + """ + Save calculated overdensity profile. + """ + data = np.column_stack([self.r, + self.r_err, + self.m]) + header = "radius[cm] radius_err[cm] overdensity" + np.savetxt(outfile, data, header=header) + + def fit_spline(self, spline, log10): + """ + Fit a smoothing line to the specified spline data, + by utilizing the R `mgcv::gam()` function. + + If 'log10' is True, the input data are first applied the log-scale + transform, and then fitted by the smoothing spline. + + The fitted spline allows extrapolation. + """ + if spline not in self.SPLINES: + raise ValueError("invalid spline: %s" % spline) + # + if spline == "mass": + # input gas/total mass profile + if log10: + x = ro.FloatVector(np.log10(self.r)) + y = ro.FloatVector(np.log10(self.m)) + self.m_spline_log10 = True + else: + x = ro.FloatVector(self.r) + y = ro.FloatVector(self.m) + self.m_spline_log10 = False + self.m_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "overdensity": + # calculated overdensity profile + if log10: + x = ro.FloatVector(np.log10(self.r)) + y = ro.FloatVector(np.log10(self.overdensity)) + self.od_spline_log10 = True + else: + x = ro.FloatVector(self.radius) + y = ro.FloatVector(self.rho_total) + self.od_spline_log10 = False + self.od_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + else: + raise ValueError("invalid spline: %s" % spline) + + def eval_spline(self, spline, x): + """ + Evaluate the specified spline at the supplied positions. + Also check whether the spline was fitted in the log-scale space, + and transform the evaluated values if necessary. + """ + x = np.array(x) + if x.shape == (): + x = x.reshape((1,)) + if spline == "mass": + spl = self.m_spline + log10 = self.m_spline_log10 + elif spline == "overdensity": + spl = self.od_spline + log10 = self.od_spline_log10 + else: + raise ValueError("invalid spline: %s" % spline) + # + if log10: + x_new = ro.ListVector({"x": ro.FloatVector(np.log10(x))}) + y_pred = self.mgcv.predict_gam(spl, newdata=x_new) + y_pred = 10 ** np.array(y_pred) + else: + x_new = ro.ListVector({"x": ro.FloatVector(x)}) + y_pred = self.mgcv.predict_gam(spl, newdata=x_new) + y_pred = np.array(y_pred) + # + if len(y_pred) == 1: + return y_pred[0] + else: + return y_pred + + +def main(): + parser = argparse.ArgumentParser( + description="Calculate overdensity profile and R_{500} etc.") + parser.add_argument("config", nargs="?", default="overdensity.conf", + help="config for overdensity profile and R_{500} " + + "etc. calculation (default: overdensity.conf)") + args = parser.parse_args() + + config = ConfigObj(args.config) + redshift = config.as_float("redshift") + m_gas_data = np.loadtxt(config["m_gas_profile"]) + m_total_data = np.loadtxt(config["m_total_profile"]) + delta = list(map(int, config.as_list("delta"))) + + m_total_profile = MassProfile(mass=m_total_data, mass_type="total") + m_total_profile.calc_overdensity(z=redshift, verbose=True) + m_total_profile.save(outfile=config["overdensity_profile"]) + + m_gas_profile = MassProfile(mass=m_gas_data, mass_type="gas") + + results = OrderedDict() + results["redshift"] = redshift + + for d in delta: + r_delta = m_total_profile.calc_radius_delta(delta=d) + m_total_delta = m_total_profile.calc_mass_delta(r_delta) + m_gas_delta = m_gas_profile.calc_mass_delta(r_delta) + results["R%d[kpc]" % d] = r_delta * au.cm.to(au.kpc) + results["Mtotal%d[Msun]" % d] = m_total_delta * au.g.to(au.solMass) + results["Mgas%d[Msun]" % d] = m_gas_delta * au.g.to(au.solMass) + + results_json = json.dumps(results, indent=2) + print(results_json) + open(config["outfile"], "w").write(results_json+"\n") + + +if __name__ == "__main__": + main() -- cgit v1.2.2