summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAaron LI <aaronly.me@outlook.com>2016-06-28 17:03:24 +0800
committerAaron LI <aaronly.me@outlook.com>2016-06-28 17:03:24 +0800
commit04b8cfc4116d3ca44cdfac476b584a3b8f7ca399 (patch)
tree9ce2f53b9f48f46278c98c1d8e0c93d2395c35ec
parent05c140ff75607b33f0ac25c06c593df1fcdbe07d (diff)
downloadcexcess-04b8cfc4116d3ca44cdfac476b584a3b8f7ca399.tar.bz2
calc_mass_potential.py: use R "mgcv::gam()" to fit smoothing splines
-rwxr-xr-xcalc_mass_potential.py321
1 files changed, 234 insertions, 87 deletions
diff --git a/calc_mass_potential.py b/calc_mass_potential.py
index 731ae43..9348319 100755
--- a/calc_mass_potential.py
+++ b/calc_mass_potential.py
@@ -2,9 +2,11 @@
#
# Aaron LI
# Created: 2016-06-24
-# Updated: 2016-06-27
+# Updated: 2016-06-28
#
# Change logs:
+# 2016-06-28:
+# * Fit smoothing splines to profiles using R `mgcv::gam()`
# 2016-06-27:
# * Implement potential profile calculation:
# methods 'calc_density_total()' and 'calc_potential()'
@@ -66,7 +68,6 @@ For example:
which is consistent with the formula of (ref.[2], eq.(3))
------------------------------------------------------------
-
Gravitational potential profile calculation:
Newton's theorems (ref.[3], Sec. 2.2.1):
@@ -123,7 +124,7 @@ t_profile = t_profile.txt
# unit of the T profile radius (default: pixel)
t_unit = "pixel"
-# number of data points for the output profile calculation
+# number of data points for the output profiles (interpolation)
num_dp = 1000
# output gas mass profile
@@ -150,6 +151,9 @@ import scipy.integrate as integrate
from scipy.misc import derivative
from configobj import ConfigObj
+import rpy2.robjects as ro
+from rpy2.robjects.packages import importr
+
from astro_params import AstroParams, ChandraPixel
from projection import Projection
@@ -168,6 +172,10 @@ class DensityProfile:
should have unit [ cm ]
* The density should have unit [ cm^-3 ] or [ g cm^-3 ]
"""
+ # available splines
+ SPLINES = ["density", "electron", "rho_gas",
+ "cooling_function", "temperature",
+ "mass_total", "rho_total"]
# allowed density profile types
DENSITY_TYPES = ["electron", "gas"]
# input density data: [r, r_err, d]
@@ -184,10 +192,6 @@ class DensityProfile:
# temperature profile
t_radius = None
t_value = None
- # fitted spline to the profile
- d_spline = None
- cf_spline = None
- t_spline = None
# generated radial data points for profile calculation
radius = None
radius_err = None
@@ -199,6 +203,23 @@ class DensityProfile:
rho_total = None
# potential profile (cut at the largest available radius)
potential = None
+ # fitted spline to the profiles
+ d_spline = None
+ d_spline_log10 = None
+ ne_spline = None
+ ne_spline_log10 = None
+ rho_gas_spline = None
+ rho_gas_spline_log10 = None
+ cf_spline = None
+ cf_spline_log10 = None
+ t_spline = None
+ t_spline_log10 = None
+ m_total_spline = None
+ m_total_spline_log10 = None
+ rho_total_spline = None
+ rho_total_spline_log10 = None
+ # call R through `rpy2`
+ mgcv = importr("mgcv")
def __init__(self, density, density_type="electron"):
self.load_data(data=density, density_type=density_type)
@@ -222,88 +243,211 @@ class DensityProfile:
self.t_radius = data[:, 0].copy()
self.t_value = data[:, 1].copy()
- def calc_brightness(self):
- """
- Project the electron number density or gas mass density profile
- to calculate the 2D surface brightness profile.
- """
- if self.cf_radius is None or self.cf_value is None:
- raise ValueError("cooling function profile missing")
- ne = self.calc_density_electron()
- # flux per unit volume
- flux = self.cf_spline(self.r) * ne ** 2 / AstroParams.ratio_ne_np
- # project the 3D flux
- projector = Projection(rout=self.r+self.r_err)
- brightness = projector.project(flux)
- return brightness
-
def calc_density_electron(self):
"""
Calculate the electron number density from the gas mass density
- if necessary.
+ if necessary, and fit a smoothing spline to it.
"""
if self.density_type == "electron":
self.ne = self.d.copy()
elif self.density_type == "gas":
self.ne = self.d / AstroParams.m_atom / AstroParams.mu_e
+ # fit a smoothing spline
+ self.fit_spline(spline="electron", log10=True)
return self.ne
def calc_density_gas(self):
"""
Calculate the gas mass density from the electron number density
- if necessary.
+ if necessary, and fit a smoothing spline to it.
"""
if self.density_type == "electron":
self.rho_gas = self.d * AstroParams.mu_e * AstroParams.m_atom
elif self.density_type == "gas":
self.rho_gas = self.d.copy()
+ # fit a smoothing spline
+ self.fit_spline(spline="rho_gas", log10=True)
return self.rho_gas
- def fit_spline(self, verbose=False):
+ def calc_brightness(self):
+ """
+ Project the electron number density or gas mass density profile
+ to calculate the 2D surface brightness profile.
"""
- Fit smoothing spline to the density profile, cooling function
- profile, and temperature profile.
+ if self.cf_radius is None or self.cf_value is None:
+ raise ValueError("cooling function profile missing")
+ if self.cf_spline is None:
+ self.fit_spline(spline="cooling_function", log10=False)
+ #
+ ne = self.calc_density_electron()
+ # flux per unit volume
+ cf_new = self.eval_spline(spline="cooling_function", x=self.r)
+ flux = cf_new * ne ** 2 / AstroParams.ratio_ne_np
+ # project the 3D flux
+ projector = Projection(rout=self.r+self.r_err)
+ brightness = projector.project(flux)
+ return brightness
- NOTE:
- * Simple interpolation (`scipy.interpolate.interp1d` of kinds
- `linear` or `quadratic` or `cubic`) may lead to a severely
- oscillating curve (e.g., electron density profile), which
- further cause problems for following mass calculation
- due to the needs to take the derivative.
- * `InterpolatedUnivariateSpline` or `UnivariateSpline` is a
- better choice than the above `interp1d`.
- * Allow cooling function profile and temperature profile to be
- extrapolated by filling with the last value if necessary.
+ def fit_spline(self, spline, log10):
"""
- # density profile
- # insert a data point at radius of zero
- if verbose:
- print("Fitting spline to density profile ...")
- self.d_spline = interpolate.InterpolatedUnivariateSpline(
- x=np.concatenate([[0.0], self.r]),
- y=np.concatenate([[self.d[0]], self.d]))
- if self.ne is not None:
- if verbose:
- print("Fitting spline to electron number density profile ...")
- self.ne_spline = interpolate.InterpolatedUnivariateSpline(
- x=np.concatenate([[0.0], self.r]),
- y=np.concatenate([[self.ne[0]], self.ne]))
- if self.rho_gas is not None:
- if verbose:
- print("Fitting spline to gas mass density profile ...")
- self.rho_gas_spline = interpolate.InterpolatedUnivariateSpline(
- x=np.concatenate([[0.0], self.r]),
- y=np.concatenate([[self.rho_gas[0]], self.rho_gas]))
- # cooling function profile
- if verbose:
- print("Fitting spline to cooling function profile ...")
- self.cf_spline = interpolate.InterpolatedUnivariateSpline(
- x=self.cf_radius, y=self.cf_value, ext="const")
- # temperature profile
- if verbose:
- print("Fitting spline to temperature profile ...")
- self.t_spline = interpolate.InterpolatedUnivariateSpline(
- x=self.t_radius, y=self.t_value, ext="const")
+ Fit a smoothing line to the specified spline data,
+ by utilizing the R `mgcv::gam()` function.
+
+ If 'log10' is True, the input data are first applied the log-scale
+ transform, and then fitted by the smoothing spline.
+
+ The fitted spline allows extrapolation.
+ """
+ if spline not in self.SPLINES:
+ raise ValueError("invalid spline: %s" % spline)
+ #
+ if spline == "density":
+ # given density profile (either electron / gas mass density)
+ if log10:
+ x = ro.FloatVector(np.log10(self.r))
+ y = ro.FloatVector(np.log10(self.d))
+ self.d_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.r)
+ y = ro.FloatVector(self.s)
+ self.d_spline_log10 = False
+ self.d_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ elif spline == "electron":
+ # input electron number density profile
+ if log10:
+ x = ro.FloatVector(np.log10(self.r))
+ y = ro.FloatVector(np.log10(self.ne))
+ self.ne_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.r)
+ y = ro.FloatVector(self.ne)
+ self.ne_spline_log10 = False
+ self.ne_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ elif spline == "rho_gas":
+ # input gas mass density profile
+ if log10:
+ x = ro.FloatVector(np.log10(self.r))
+ y = ro.FloatVector(np.log10(self.rho_gas))
+ self.rho_gas_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.r)
+ y = ro.FloatVector(self.rho_gas)
+ self.rho_gas_spline_log10 = False
+ self.rho_gas_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ elif spline == "cooling_function":
+ # input cooling function profile
+ if log10:
+ x = ro.FloatVector(np.log10(self.cf_radius))
+ y = ro.FloatVector(np.log10(self.cf_value))
+ self.cf_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.cf_radius)
+ y = ro.FloatVector(self.cf_value)
+ self.cf_spline_log10 = False
+ self.cf_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ elif spline == "temperature":
+ # input temperature profile
+ if log10:
+ x = ro.FloatVector(np.log10(self.t_radius))
+ y = ro.FloatVector(np.log10(self.t_value))
+ self.t_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.t_radius)
+ y = ro.FloatVector(self.t_value)
+ self.t_spline_log10 = False
+ self.t_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ elif spline == "mass_total":
+ # calculated total/gravitational mass profile
+ if log10:
+ x = ro.FloatVector(np.log10(self.radius))
+ y = ro.FloatVector(np.log10(self.m_total))
+ self.m_total_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.radius)
+ y = ro.FloatVector(self.m_total)
+ self.m_total_spline_log10 = False
+ self.m_total_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ elif spline == "rho_total":
+ # calculated total mass density profile
+ if log10:
+ x = ro.FloatVector(np.log10(self.radius))
+ y = ro.FloatVector(np.log10(self.rho_total))
+ self.rho_total_spline_log10 = True
+ else:
+ x = ro.FloatVector(self.radius)
+ y = ro.FloatVector(self.rho_total)
+ self.rho_total_spline_log10 = False
+ self.rho_total_spline = self.mgcv.gam(
+ ro.Formula("y ~ s(x, bs='ps')"),
+ data=ro.DataFrame({"x": x, "y": y}),
+ method="REML")
+ else:
+ raise ValueError("invalid spline: %s" % spline)
+
+ def eval_spline(self, spline, x):
+ """
+ Evaluate the specified spline at the supplied positions.
+ Also check whether the spline was fitted in the log-scale space,
+ and transform the evaluated values if necessary.
+ """
+ x = np.array(x)
+ if x.shape == ():
+ x = x.reshape((1,))
+ if spline == "density":
+ spl = self.d_spline
+ log10 = self.d_spline_log10
+ elif spline == "electron":
+ spl = self.ne_spline
+ log10 = self.ne_spline_log10
+ elif spline == "rho_gas":
+ spl = self.rho_gas_spline
+ log10 = self.rho_gas_spline_log10
+ elif spline == "cooling_function":
+ spl = self.cf_spline
+ log10 = self.cf_spline_log10
+ elif spline == "temperature":
+ spl = self.t_spline
+ log10 = self.t_spline_log10
+ elif spline == "mass_total":
+ spl = self.m_total_spline
+ log10 = self.m_total_spline_log10
+ elif spline == "rho_total":
+ spl = self.rho_total_spline
+ log10 = self.rho_total_spline_log10
+ else:
+ raise ValueError("invalid spline: %s" % spline)
+ #
+ if log10:
+ x_new = ro.ListVector({"x": ro.FloatVector(np.log10(x))})
+ y_pred = self.mgcv.predict_gam(spl, newdata=x_new)
+ y_pred = 10 ** np.array(y_pred)
+ else:
+ x_new = ro.ListVector({"x": ro.FloatVector(x)})
+ y_pred = self.mgcv.predict_gam(spl, newdata=x_new)
+ y_pred = np.array(y_pred)
+ #
+ if len(y_pred) == 1:
+ return y_pred[0]
+ else:
+ return y_pred
def gen_radius(self, num=1000):
"""
@@ -341,7 +485,7 @@ class DensityProfile:
Reference: ref.[1], eq.(9)
"""
def _f_rho_gas(r):
- return self.rho_gas_spline(r) * 4*np.pi * r**2
+ return 4*np.pi * r**2 * self.eval_spline(spline="rho_gas", x=r)
#
m_gas = np.zeros(self.radius.shape)
if verbose:
@@ -368,6 +512,8 @@ class DensityProfile:
"""
if self.t_radius is None or self.t_value is None:
raise ValueError("temperature profile required")
+ if self.t_spline is None:
+ self.fit_spline(spline="temperature", log10=False)
#
# calculate the coefficient of the total mass formula
# M0 = (k_B * T_gas(r) * r) / (mu * m_atom * G)
@@ -380,12 +526,15 @@ class DensityProfile:
for i, r in enumerate(self.radius):
if verbose and (i+1) % 100 == 0:
print("%d..." % (i+1), end="", flush=True)
+ T = self.eval_spline(spline="temperature", x=r)
+ dT_dr = derivative(lambda r: self.eval_spline("temperature", r),
+ r, dx=0.01*au.kpc.to(au.cm))
+ ne = self.eval_spline(spline="electron", x=r)
+ dne_dr = derivative(lambda r: self.eval_spline("electron", r),
+ r, dx=0.01*au.kpc.to(au.cm))
# enclosed total mass [ g ]
- m_total[i] = - M0 * self.t_spline(r) * r * (
- ((r / self.t_spline(r)) *
- derivative(self.t_spline, r, dx=0.01*au.kpc.to(au.cm))) +
- ((r / self.ne_spline(r)) *
- derivative(self.ne_spline, r, dx=0.01*au.kpc.to(au.cm))))
+ m_total[i] = - M0 * T * r * (((r / T) * dT_dr) +
+ ((r / ne) * dne_dr))
if verbose:
print("DONE!", flush=True)
self.m_total = m_total
@@ -396,21 +545,18 @@ class DensityProfile:
Calculate the total mass density profile, which is required to
calculate the following gravitational potential profile.
"""
- rho_total = np.zeros(self.radius.shape)
+ if self.m_total_spline is None:
+ self.fit_spline(spline="mass_total", log10=True)
+ #
if verbose:
print("Calculating the total mass density profile ...")
- print(">>> Fitting spline to total mass profile ...")
- self.m_total_spline = interpolate.InterpolatedUnivariateSpline(
- x=self.radius, y=self.m_total, ext="const")
+ rho_total = np.zeros(self.radius.shape)
for i, r in enumerate(self.radius):
- rho_total[i] = (derivative(self.m_total_spline, r,
- dx=0.01*au.kpc.to(au.cm)) /
- (4 * np.pi * r**2))
+ dM_dr = derivative(lambda r: self.eval_spline("mass_total", r),
+ r, dx=0.01*au.kpc.to(au.cm))
+ rho_total[i] = (dM_dr / (4 * np.pi * r**2))
self.rho_total = rho_total
- if verbose:
- print(">>> Fitting spline to total mass density profile ...")
- self.rho_total_spline = interpolate.InterpolatedUnivariateSpline(
- x=self.radius, y=rho_total, ext="const")
+ return rho_total
def calc_potential(self, verbose=True):
"""
@@ -423,20 +569,22 @@ class DensityProfile:
potentials, but not the shape of the potential profile.
"""
def _int_inner(x):
- return x**2 * self.rho_total_spline(x)
+ return x**2 * self.eval_spline(spline="rho_total", x=x)
def _int_outer(x):
- return x * self.rho_total_spline(x)
+ return x * self.eval_spline(spline="rho_total", x=x)
if self.rho_total is None:
self.calc_density_total(verbose=verbose)
+ if self.rho_total_spline is None:
+ self.fit_spline(spline="rho_total", log10=True)
potential = np.zeros(self.radius.shape)
if verbose:
print("Calculating the potential profile (#%d): ..." %
len(self.radius), end="", flush=True)
r_max = max(self.radius)
for i, r in enumerate(self.radius):
- if verbose and (i+1) % 50 == 0:
+ if verbose and (i+1) % 10 == 0:
print("%d..." % (i+1), end="", flush=True)
potential[i] = - 4 * np.pi * ac.G.cgs.value * (
(1/r) * integrate.quad(_int_inner, a=0.0, b=r,
@@ -524,7 +672,6 @@ def main():
density_profile.load_t_profile(t_profile)
density_profile.calc_density_electron()
density_profile.calc_density_gas()
- density_profile.fit_spline(verbose=True)
density_profile.gen_radius(num=config.as_int("num_dp"))
density_profile.calc_mass_gas(verbose=True)
density_profile.save(profile="mass_gas",