diff options
Diffstat (limited to 'calc_mass_potential.py')
-rwxr-xr-x | calc_mass_potential.py | 321 |
1 files changed, 234 insertions, 87 deletions
diff --git a/calc_mass_potential.py b/calc_mass_potential.py index 731ae43..9348319 100755 --- a/calc_mass_potential.py +++ b/calc_mass_potential.py @@ -2,9 +2,11 @@ # # Aaron LI # Created: 2016-06-24 -# Updated: 2016-06-27 +# Updated: 2016-06-28 # # Change logs: +# 2016-06-28: +# * Fit smoothing splines to profiles using R `mgcv::gam()` # 2016-06-27: # * Implement potential profile calculation: # methods 'calc_density_total()' and 'calc_potential()' @@ -66,7 +68,6 @@ For example: which is consistent with the formula of (ref.[2], eq.(3)) ------------------------------------------------------------ - Gravitational potential profile calculation: Newton's theorems (ref.[3], Sec. 2.2.1): @@ -123,7 +124,7 @@ t_profile = t_profile.txt # unit of the T profile radius (default: pixel) t_unit = "pixel" -# number of data points for the output profile calculation +# number of data points for the output profiles (interpolation) num_dp = 1000 # output gas mass profile @@ -150,6 +151,9 @@ import scipy.integrate as integrate from scipy.misc import derivative from configobj import ConfigObj +import rpy2.robjects as ro +from rpy2.robjects.packages import importr + from astro_params import AstroParams, ChandraPixel from projection import Projection @@ -168,6 +172,10 @@ class DensityProfile: should have unit [ cm ] * The density should have unit [ cm^-3 ] or [ g cm^-3 ] """ + # available splines + SPLINES = ["density", "electron", "rho_gas", + "cooling_function", "temperature", + "mass_total", "rho_total"] # allowed density profile types DENSITY_TYPES = ["electron", "gas"] # input density data: [r, r_err, d] @@ -184,10 +192,6 @@ class DensityProfile: # temperature profile t_radius = None t_value = None - # fitted spline to the profile - d_spline = None - cf_spline = None - t_spline = None # generated radial data points for profile calculation radius = None radius_err = None @@ -199,6 +203,23 @@ class DensityProfile: rho_total = None # potential profile (cut at the largest available radius) potential = None + # fitted spline to the profiles + d_spline = None + d_spline_log10 = None + ne_spline = None + ne_spline_log10 = None + rho_gas_spline = None + rho_gas_spline_log10 = None + cf_spline = None + cf_spline_log10 = None + t_spline = None + t_spline_log10 = None + m_total_spline = None + m_total_spline_log10 = None + rho_total_spline = None + rho_total_spline_log10 = None + # call R through `rpy2` + mgcv = importr("mgcv") def __init__(self, density, density_type="electron"): self.load_data(data=density, density_type=density_type) @@ -222,88 +243,211 @@ class DensityProfile: self.t_radius = data[:, 0].copy() self.t_value = data[:, 1].copy() - def calc_brightness(self): - """ - Project the electron number density or gas mass density profile - to calculate the 2D surface brightness profile. - """ - if self.cf_radius is None or self.cf_value is None: - raise ValueError("cooling function profile missing") - ne = self.calc_density_electron() - # flux per unit volume - flux = self.cf_spline(self.r) * ne ** 2 / AstroParams.ratio_ne_np - # project the 3D flux - projector = Projection(rout=self.r+self.r_err) - brightness = projector.project(flux) - return brightness - def calc_density_electron(self): """ Calculate the electron number density from the gas mass density - if necessary. + if necessary, and fit a smoothing spline to it. """ if self.density_type == "electron": self.ne = self.d.copy() elif self.density_type == "gas": self.ne = self.d / AstroParams.m_atom / AstroParams.mu_e + # fit a smoothing spline + self.fit_spline(spline="electron", log10=True) return self.ne def calc_density_gas(self): """ Calculate the gas mass density from the electron number density - if necessary. + if necessary, and fit a smoothing spline to it. """ if self.density_type == "electron": self.rho_gas = self.d * AstroParams.mu_e * AstroParams.m_atom elif self.density_type == "gas": self.rho_gas = self.d.copy() + # fit a smoothing spline + self.fit_spline(spline="rho_gas", log10=True) return self.rho_gas - def fit_spline(self, verbose=False): + def calc_brightness(self): + """ + Project the electron number density or gas mass density profile + to calculate the 2D surface brightness profile. """ - Fit smoothing spline to the density profile, cooling function - profile, and temperature profile. + if self.cf_radius is None or self.cf_value is None: + raise ValueError("cooling function profile missing") + if self.cf_spline is None: + self.fit_spline(spline="cooling_function", log10=False) + # + ne = self.calc_density_electron() + # flux per unit volume + cf_new = self.eval_spline(spline="cooling_function", x=self.r) + flux = cf_new * ne ** 2 / AstroParams.ratio_ne_np + # project the 3D flux + projector = Projection(rout=self.r+self.r_err) + brightness = projector.project(flux) + return brightness - NOTE: - * Simple interpolation (`scipy.interpolate.interp1d` of kinds - `linear` or `quadratic` or `cubic`) may lead to a severely - oscillating curve (e.g., electron density profile), which - further cause problems for following mass calculation - due to the needs to take the derivative. - * `InterpolatedUnivariateSpline` or `UnivariateSpline` is a - better choice than the above `interp1d`. - * Allow cooling function profile and temperature profile to be - extrapolated by filling with the last value if necessary. + def fit_spline(self, spline, log10): """ - # density profile - # insert a data point at radius of zero - if verbose: - print("Fitting spline to density profile ...") - self.d_spline = interpolate.InterpolatedUnivariateSpline( - x=np.concatenate([[0.0], self.r]), - y=np.concatenate([[self.d[0]], self.d])) - if self.ne is not None: - if verbose: - print("Fitting spline to electron number density profile ...") - self.ne_spline = interpolate.InterpolatedUnivariateSpline( - x=np.concatenate([[0.0], self.r]), - y=np.concatenate([[self.ne[0]], self.ne])) - if self.rho_gas is not None: - if verbose: - print("Fitting spline to gas mass density profile ...") - self.rho_gas_spline = interpolate.InterpolatedUnivariateSpline( - x=np.concatenate([[0.0], self.r]), - y=np.concatenate([[self.rho_gas[0]], self.rho_gas])) - # cooling function profile - if verbose: - print("Fitting spline to cooling function profile ...") - self.cf_spline = interpolate.InterpolatedUnivariateSpline( - x=self.cf_radius, y=self.cf_value, ext="const") - # temperature profile - if verbose: - print("Fitting spline to temperature profile ...") - self.t_spline = interpolate.InterpolatedUnivariateSpline( - x=self.t_radius, y=self.t_value, ext="const") + Fit a smoothing line to the specified spline data, + by utilizing the R `mgcv::gam()` function. + + If 'log10' is True, the input data are first applied the log-scale + transform, and then fitted by the smoothing spline. + + The fitted spline allows extrapolation. + """ + if spline not in self.SPLINES: + raise ValueError("invalid spline: %s" % spline) + # + if spline == "density": + # given density profile (either electron / gas mass density) + if log10: + x = ro.FloatVector(np.log10(self.r)) + y = ro.FloatVector(np.log10(self.d)) + self.d_spline_log10 = True + else: + x = ro.FloatVector(self.r) + y = ro.FloatVector(self.s) + self.d_spline_log10 = False + self.d_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "electron": + # input electron number density profile + if log10: + x = ro.FloatVector(np.log10(self.r)) + y = ro.FloatVector(np.log10(self.ne)) + self.ne_spline_log10 = True + else: + x = ro.FloatVector(self.r) + y = ro.FloatVector(self.ne) + self.ne_spline_log10 = False + self.ne_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "rho_gas": + # input gas mass density profile + if log10: + x = ro.FloatVector(np.log10(self.r)) + y = ro.FloatVector(np.log10(self.rho_gas)) + self.rho_gas_spline_log10 = True + else: + x = ro.FloatVector(self.r) + y = ro.FloatVector(self.rho_gas) + self.rho_gas_spline_log10 = False + self.rho_gas_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "cooling_function": + # input cooling function profile + if log10: + x = ro.FloatVector(np.log10(self.cf_radius)) + y = ro.FloatVector(np.log10(self.cf_value)) + self.cf_spline_log10 = True + else: + x = ro.FloatVector(self.cf_radius) + y = ro.FloatVector(self.cf_value) + self.cf_spline_log10 = False + self.cf_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "temperature": + # input temperature profile + if log10: + x = ro.FloatVector(np.log10(self.t_radius)) + y = ro.FloatVector(np.log10(self.t_value)) + self.t_spline_log10 = True + else: + x = ro.FloatVector(self.t_radius) + y = ro.FloatVector(self.t_value) + self.t_spline_log10 = False + self.t_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "mass_total": + # calculated total/gravitational mass profile + if log10: + x = ro.FloatVector(np.log10(self.radius)) + y = ro.FloatVector(np.log10(self.m_total)) + self.m_total_spline_log10 = True + else: + x = ro.FloatVector(self.radius) + y = ro.FloatVector(self.m_total) + self.m_total_spline_log10 = False + self.m_total_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + elif spline == "rho_total": + # calculated total mass density profile + if log10: + x = ro.FloatVector(np.log10(self.radius)) + y = ro.FloatVector(np.log10(self.rho_total)) + self.rho_total_spline_log10 = True + else: + x = ro.FloatVector(self.radius) + y = ro.FloatVector(self.rho_total) + self.rho_total_spline_log10 = False + self.rho_total_spline = self.mgcv.gam( + ro.Formula("y ~ s(x, bs='ps')"), + data=ro.DataFrame({"x": x, "y": y}), + method="REML") + else: + raise ValueError("invalid spline: %s" % spline) + + def eval_spline(self, spline, x): + """ + Evaluate the specified spline at the supplied positions. + Also check whether the spline was fitted in the log-scale space, + and transform the evaluated values if necessary. + """ + x = np.array(x) + if x.shape == (): + x = x.reshape((1,)) + if spline == "density": + spl = self.d_spline + log10 = self.d_spline_log10 + elif spline == "electron": + spl = self.ne_spline + log10 = self.ne_spline_log10 + elif spline == "rho_gas": + spl = self.rho_gas_spline + log10 = self.rho_gas_spline_log10 + elif spline == "cooling_function": + spl = self.cf_spline + log10 = self.cf_spline_log10 + elif spline == "temperature": + spl = self.t_spline + log10 = self.t_spline_log10 + elif spline == "mass_total": + spl = self.m_total_spline + log10 = self.m_total_spline_log10 + elif spline == "rho_total": + spl = self.rho_total_spline + log10 = self.rho_total_spline_log10 + else: + raise ValueError("invalid spline: %s" % spline) + # + if log10: + x_new = ro.ListVector({"x": ro.FloatVector(np.log10(x))}) + y_pred = self.mgcv.predict_gam(spl, newdata=x_new) + y_pred = 10 ** np.array(y_pred) + else: + x_new = ro.ListVector({"x": ro.FloatVector(x)}) + y_pred = self.mgcv.predict_gam(spl, newdata=x_new) + y_pred = np.array(y_pred) + # + if len(y_pred) == 1: + return y_pred[0] + else: + return y_pred def gen_radius(self, num=1000): """ @@ -341,7 +485,7 @@ class DensityProfile: Reference: ref.[1], eq.(9) """ def _f_rho_gas(r): - return self.rho_gas_spline(r) * 4*np.pi * r**2 + return 4*np.pi * r**2 * self.eval_spline(spline="rho_gas", x=r) # m_gas = np.zeros(self.radius.shape) if verbose: @@ -368,6 +512,8 @@ class DensityProfile: """ if self.t_radius is None or self.t_value is None: raise ValueError("temperature profile required") + if self.t_spline is None: + self.fit_spline(spline="temperature", log10=False) # # calculate the coefficient of the total mass formula # M0 = (k_B * T_gas(r) * r) / (mu * m_atom * G) @@ -380,12 +526,15 @@ class DensityProfile: for i, r in enumerate(self.radius): if verbose and (i+1) % 100 == 0: print("%d..." % (i+1), end="", flush=True) + T = self.eval_spline(spline="temperature", x=r) + dT_dr = derivative(lambda r: self.eval_spline("temperature", r), + r, dx=0.01*au.kpc.to(au.cm)) + ne = self.eval_spline(spline="electron", x=r) + dne_dr = derivative(lambda r: self.eval_spline("electron", r), + r, dx=0.01*au.kpc.to(au.cm)) # enclosed total mass [ g ] - m_total[i] = - M0 * self.t_spline(r) * r * ( - ((r / self.t_spline(r)) * - derivative(self.t_spline, r, dx=0.01*au.kpc.to(au.cm))) + - ((r / self.ne_spline(r)) * - derivative(self.ne_spline, r, dx=0.01*au.kpc.to(au.cm)))) + m_total[i] = - M0 * T * r * (((r / T) * dT_dr) + + ((r / ne) * dne_dr)) if verbose: print("DONE!", flush=True) self.m_total = m_total @@ -396,21 +545,18 @@ class DensityProfile: Calculate the total mass density profile, which is required to calculate the following gravitational potential profile. """ - rho_total = np.zeros(self.radius.shape) + if self.m_total_spline is None: + self.fit_spline(spline="mass_total", log10=True) + # if verbose: print("Calculating the total mass density profile ...") - print(">>> Fitting spline to total mass profile ...") - self.m_total_spline = interpolate.InterpolatedUnivariateSpline( - x=self.radius, y=self.m_total, ext="const") + rho_total = np.zeros(self.radius.shape) for i, r in enumerate(self.radius): - rho_total[i] = (derivative(self.m_total_spline, r, - dx=0.01*au.kpc.to(au.cm)) / - (4 * np.pi * r**2)) + dM_dr = derivative(lambda r: self.eval_spline("mass_total", r), + r, dx=0.01*au.kpc.to(au.cm)) + rho_total[i] = (dM_dr / (4 * np.pi * r**2)) self.rho_total = rho_total - if verbose: - print(">>> Fitting spline to total mass density profile ...") - self.rho_total_spline = interpolate.InterpolatedUnivariateSpline( - x=self.radius, y=rho_total, ext="const") + return rho_total def calc_potential(self, verbose=True): """ @@ -423,20 +569,22 @@ class DensityProfile: potentials, but not the shape of the potential profile. """ def _int_inner(x): - return x**2 * self.rho_total_spline(x) + return x**2 * self.eval_spline(spline="rho_total", x=x) def _int_outer(x): - return x * self.rho_total_spline(x) + return x * self.eval_spline(spline="rho_total", x=x) if self.rho_total is None: self.calc_density_total(verbose=verbose) + if self.rho_total_spline is None: + self.fit_spline(spline="rho_total", log10=True) potential = np.zeros(self.radius.shape) if verbose: print("Calculating the potential profile (#%d): ..." % len(self.radius), end="", flush=True) r_max = max(self.radius) for i, r in enumerate(self.radius): - if verbose and (i+1) % 50 == 0: + if verbose and (i+1) % 10 == 0: print("%d..." % (i+1), end="", flush=True) potential[i] = - 4 * np.pi * ac.G.cgs.value * ( (1/r) * integrate.quad(_int_inner, a=0.0, b=r, @@ -524,7 +672,6 @@ def main(): density_profile.load_t_profile(t_profile) density_profile.calc_density_electron() density_profile.calc_density_gas() - density_profile.fit_spline(verbose=True) density_profile.gen_radius(num=config.as_int("num_dp")) density_profile.calc_mass_gas(verbose=True) density_profile.save(profile="mass_gas", |