From f27f9326554802ef21250587f0f2252c6a1d7139 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Wed, 13 Jul 2016 17:11:06 +0800 Subject: calc_overdensity.py: Use class "SmoothSpline" from module "spline.py" --- calc_overdensity.py | 78 +++++++++++------------------------------------------ 1 file changed, 16 insertions(+), 62 deletions(-) diff --git a/calc_overdensity.py b/calc_overdensity.py index 562311e..501f5dd 100755 --- a/calc_overdensity.py +++ b/calc_overdensity.py @@ -2,9 +2,11 @@ # # Aaron LI # Created: 2016-06-30 -# Updated: 2016-07-11 +# Updated: 2016-07-13 # # Change logs: +# 2016-07-13: +# * Use class 'SmoothSpline' from module 'spline.py' # 2016-07-11: # * Use a default config to allow a minimal user config # 2016-07-04: @@ -35,10 +37,8 @@ import astropy.units as au from astropy.cosmology import FlatLambdaCDM from configobj import ConfigObj -import rpy2.robjects as ro -from rpy2.robjects.packages import importr - from astro_params import AstroParams +from spline import SmoothSpline config_default = """ @@ -83,11 +83,7 @@ class MassProfile: redshift = None # fitted smoothing spline m_spline = None - m_spline_log10 = None od_spline = None - od_spline_log10 = None - # call R through `rpy2` - mgcv = importr("mgcv") def __init__(self, mass, mass_type="total"): self.load_data(data=mass, mass_type=mass_type) @@ -130,7 +126,7 @@ class MassProfile: Calculate the radius at which the overdensity is delta. """ if self.od_spline is None: - self.fit_spline(spline="overdensity", log10=True) + self.fit_spline(spline="overdensity", log10=["x", "y"]) if min(self.overdensity) > delta: raise ValueError("min(overdensity) > %d" % delta) r = optimize.newton( @@ -140,7 +136,7 @@ class MassProfile: def calc_mass_delta(self, r_delta): if self.m_spline is None: - self.fit_spline(spline="mass", log10=True) + self.fit_spline(spline="mass", log10=["x", "y"]) return self.eval_spline(spline="mass", x=r_delta) def save(self, outfile): @@ -153,49 +149,24 @@ class MassProfile: header = "radius[kpc] radius_err[kpc] overdensity" np.savetxt(outfile, data, header=header) - def fit_spline(self, spline, log10): - """ - Fit a smoothing line to the specified spline data, - by utilizing the R `mgcv::gam()` function. - - If 'log10' is True, the input data are first applied the log-scale - transform, and then fitted by the smoothing spline. - - The fitted spline allows extrapolation. - """ + def fit_spline(self, spline, log10=[]): if spline not in self.SPLINES: raise ValueError("invalid spline: %s" % spline) # if spline == "mass": # input gas/total mass profile - if log10: - x = ro.FloatVector(np.log10(self.r)) - y = ro.FloatVector(np.log10(self.m)) - self.m_spline_log10 = True - else: - x = ro.FloatVector(self.r) - y = ro.FloatVector(self.m) - self.m_spline_log10 = False - self.m_spline = self.mgcv.gam( - ro.Formula("y ~ s(x, bs='ps')"), - data=ro.DataFrame({"x": x, "y": y}), - method="REML") + x = self.r + y = self.m + spl = "m_spline" elif spline == "overdensity": # calculated overdensity profile - if log10: - x = ro.FloatVector(np.log10(self.r)) - y = ro.FloatVector(np.log10(self.overdensity)) - self.od_spline_log10 = True - else: - x = ro.FloatVector(self.radius) - y = ro.FloatVector(self.rho_total) - self.od_spline_log10 = False - self.od_spline = self.mgcv.gam( - ro.Formula("y ~ s(x, bs='ps')"), - data=ro.DataFrame({"x": x, "y": y}), - method="REML") + x = self.radius + y = self.rho_total + spl = "od_spline" else: raise ValueError("invalid spline: %s" % spline) + setattr(self, spl, SmoothSpline(x=x, y=y)) + getattr(self, spl).fit(log10=log10) def eval_spline(self, spline, x): """ @@ -203,31 +174,14 @@ class MassProfile: Also check whether the spline was fitted in the log-scale space, and transform the evaluated values if necessary. """ - x = np.array(x) - if x.shape == (): - x = x.reshape((1,)) if spline == "mass": spl = self.m_spline - log10 = self.m_spline_log10 elif spline == "overdensity": spl = self.od_spline - log10 = self.od_spline_log10 else: raise ValueError("invalid spline: %s" % spline) # - if log10: - x_new = ro.ListVector({"x": ro.FloatVector(np.log10(x))}) - y_pred = self.mgcv.predict_gam(spl, newdata=x_new) - y_pred = 10 ** np.array(y_pred) - else: - x_new = ro.ListVector({"x": ro.FloatVector(x)}) - y_pred = self.mgcv.predict_gam(spl, newdata=x_new) - y_pred = np.array(y_pred) - # - if len(y_pred) == 1: - return y_pred[0] - else: - return y_pred + return spl.eval(x) def main(): -- cgit v1.2.2