diff options
author | Aaron LI <aaronly.me@outlook.com> | 2016-06-13 23:37:33 +0800 |
---|---|---|
committer | Aaron LI <aaronly.me@outlook.com> | 2016-06-13 23:37:33 +0800 |
commit | 7dc84ad3586e6c8fe27dc9482de0331ca066288e (patch) | |
tree | e521e753966cd94d315de5c5ad3ad5ba1ffc6a86 | |
parent | eb938f3f590b311761530e9057473943d1ade146 (diff) | |
download | cexcess-7dc84ad3586e6c8fe27dc9482de0331ca066288e.tar.bz2 |
deproject_sbp.py: implement primitive SBP deprojection approach
-rwxr-xr-x | deproject_sbp.py | 303 |
1 files changed, 302 insertions, 1 deletions
diff --git a/deproject_sbp.py b/deproject_sbp.py index 8881b2f..94193e0 100755 --- a/deproject_sbp.py +++ b/deproject_sbp.py @@ -10,17 +10,25 @@ # References: # [1] Croston et al. 2006, A&A, 459, 1007-1019 # [2] McLaughlin, 1999, ApJ, 117, 2398-2427 +# [3] Bouchet, 1995, A&AS, 113, 167 # # # Weitian LI # Created: 2016-06-10 -# Updated: 2016-06-10 +# Updated: 2016-06-13 +# +# Change logs: +# 2016-06-13: +# * Add class 'ABModel' to support data scaling +# * Implement primitive SBP deprojection approach for class 'DeprojectSBP' # import argparse import numpy as np +import scipy.optimize +import lmfit class Projection: @@ -159,6 +167,299 @@ def testProjection(): print("All tests PASSED!") +class ABModel: + """ + AB model is a modified version of the beta model, which can roughly + fit both centrally peaked and cored models, e.g., central excess emission. + + References: + [1] Pratt & Arnaud, 2002, A&A, 394, 375; eq.(2) + [2] ref.[1], eq.(10) + """ + name = "AB model" + # model parameters + params = lmfit.Parameters() + params.add_many( # (name, value, vary, min, max, expr) + ("A", 1.0e-9, True, 0.0, 1.0e-5, None), + ("alpha", 0.7, True, 0.1, 1.1, None), + ("rc", 30.0, True, 1.0, 1.0e4, None), + ("beta", 0.7, True, 0.3, 1.1, None)) + # optimization method + fit_method = "lbfgsb" + # whether the 'ydata' and 'yerr' to be scaled in order to reduce + # the dynamical range for a more stable fitting + scale = False + scale_factor = 1.0 + + def __init__(self, fit_method="lbfgsb", params=None, scale=True): + self.fit_method = fit_method + if params is not None: + self.load_params(params) + self.scale = scale + + def load_data(self, xdata, ydata=None, xerr=None, yerr=None, + update_params=False): + self.reset() + if xdata.ndim == 2 and xdata.shape[1] == 4: + # 4-column data + self.xdata = xdata[:, 0].copy() + self.xerr = xdata[:, 1].copy() + self.ydata = xdata[:, 2].copy() + self.yerr = xdata[:, 3].copy() + else: + self.xdata = np.array(xdata) + self.ydata = np.array(ydata) + self.xerr = np.array(xerr) + self.yerr = np.array(yerr) + self.scale_data(update_params=update_params) + + def scale_data(self, update_params=False): + if self.scale: + y_min = np.min(self.ydata) + y_max = np.max(self.ydata) + self.scale_factor = np.exp(np.log(y_min*y_max) / 2) + self.ydata /= self.scale_factor + self.yerr /= self.scale_factor + if update_params: + A_min = 1.0 + A_max = np.max(self.ydata) + self.set_param("A", value=(A_min+A_max)*0.5, + min=A_min, max=A_max) + + def reset(self): + self.fitter = None + self.fitted = None + + def f_residual(self, params): + if self.yerr is None: + return self.model(self.xdata, params) - self.ydata + else: + return (self.model(self.xdata, params) - self.ydata) / self.yerr + + def fit(self, method=None): + if method is None: + method = self.fit_method + self.fitter = lmfit.Minimizer(self.f_residual, self.params) + self.fitted = self.fitter.minimize(method=method) + self.load_params(self.fitted.params) + + @staticmethod + def model(x, params): + parvals = params.valuesdict() + A = parvals["A"] + alpha = parvals["alpha"] + rc = parvals["rc"] + beta = parvals["beta"] + return (A * np.power(x/rc, -alpha) * + np.power((1 + (x/rc)**2), -1.5*beta + 0.5*alpha)) + + def f(self, x): + return self.model(x, self.params) * self.scale_factor + + def get_param(self, name=None): + """ + Return the requested 'Parameter' object or the whole + 'Parameters' object of no name supplied. + """ + try: + return self.params[name] + except KeyError: + return self.params + + def set_param(self, name, *args, **kwargs): + """ + Set the properties of the specified parameter. + """ + param = self.params[name] + param.set(*args, **kwargs) + + def dump_params(self, serialize=True): + """ + Dump the current values/settings for all model parameters, + and these dumped results can be later loaded by 'load_params()'. + """ + if serialize: + return self.params.dumps() + else: + return self.params.copy() + + def load_params(self, params): + """ + Load the provided parameters values/settings. + """ + if isinstance(params, lmfit.parameter.Parameters): + self.params = params.copy() + else: + p = lmfit.parameter.Parameters() + p.loads(params) + self.params = p + + +class DeprojectSBP: + """ + Deproject the observed SBP to derive the 3D emission measure (EM) + profile, using a regularization technique. + + References: ref.[1] + """ + # input SBP data: [r, r_err, s, s_err] + r = None + r_err = None + s = None + s_err = None + # 'Projection' instance for this SBP + projector = None + # 'ABModel' instance to fit the deprojected EM profile for rescaling data + abmodel = None + # smoothing parameter to balance between fidelity (chisq) and + # consistency with the applied regularization constraint. + lbd = 1.0 + # optimization method for scipy minimize + opt_method = "Powell" + # scipy optimize results from 'self.deproject()' + deproject_res = None + #### + debug_xlist = [] + + def __init__(self, r, r_err=None, s=None, s_err=None, + lbd=1.0, opt_method="Powell"): + self.load_data(r=r, r_err=r_err, s=s, s_err=s_err) + self.projector = Projection(rout=self.r+self.r_err) + self.abmodel = ABModel(scale=True) + self.lbd = lbd + self.opt_method = opt_method + self.debug_xlist = [] + + def load_data(self, r, r_err=None, s=None, s_err=None): + if r.ndim == 2 and r.shape[1] == 4: + # 4-column data + self.r = r[:, 0].copy() + self.r_err = r[:, 1].copy() + self.s = r[:, 2].copy() + self.s_err = r[:, 3].copy() + else: + self.r = np.array(r) + self.r_err = np.array(r_err) + self.s = np.array(s) + self.s_err = np.array(s_err) + + def deproject(self, lbd=None, opt_method=None): + """ + Deproject the observed SBP to derive the 3D EM profile by + minimizing the objective function. + """ + def fobj(x): + return self.f_objective(x, scaled=True) + + def callback(x): + # NOTE: 'x' here is the scaled EM solution + x_unscaled = self.unscale_data(x) + self.update_abmodel(x_unscaled) + + if lbd is not None: + self.lbd = lbd + if opt_method is None: + opt_method = self.opt_method + # initial guess + em0 = self.projector.deproject(self.s) + # scale the EM data to reduce the dynamical range + em0_scaled = self.scale_data(em0, update_params=True) + # DEBUG + self.debug_xlist.append(em0_scaled) + res = scipy.optimize.minimize(fun=fobj, x0=em0_scaled, + method=opt_method, + callback=callback, + options={"disp": True}) + self.deproject_res = res + self.em = self.unscale_data(res.x) + return self.em + + def update_abmodel(self, x, xerr=None, update_params=False): + """ + Load the supplied data into self.abmodel, and perform fitting. + + If the errors/uncertainties is not specified, it is assumed + to have the same relative errors as the observed SBP. + """ + self.debug_xlist.append(x) + if xerr is None: + x_err = x * self.s_err / self.s + self.abmodel.load_data(xdata=self.r, xerr=self.r_err, + ydata=x, yerr=x_err, + update_params=update_params) + self.abmodel.fit() + + def scale_data(self, x, xerr=None, update_params=False): + """ + Scale the data (i.e., 3D EM profile) by dividing the fitted + AB model. + + If the errors/uncertainties is not specified, it is assumed + to have the same relative errors as the observed SBP. + """ + self.update_abmodel(x=x, xerr=xerr, update_params=update_params) + x_fitted = self.abmodel.f(self.r) + x_scaled = x / x_fitted + return x_scaled + + def unscale_data(self, x): + """ + Undo the data scaling by multiplying the same fitted model + previously used to scale the data. + """ + x_fitted = self.abmodel.f(self.r) + x_unscaled = x * x_fitted + return x_unscaled + + def f_objective(self, x, scaled=False): + """ + The objective function to be minimized, in order to derive the + best solution (i.e., deprojected SBP) for the observed SBP. + + This objective function is a combination of plain chi-squared + and a regularization constraint. + + 'lbd' is the parameter to balance the goodness-of-fit and the + regularization constraint. + + References: ref.[1], eq.(2) + """ + return (self.f_chisq(x, scaled=scaled) + + self.lbd * self.f_constraint(x, scaled=scaled)) + + def f_residual(self, x, scaled=False): + """ + Calculate the residuals of each data point for the solution. + + The current solution (i.e., 3D EM profile) is first projected + into the 2D SBP, then compared to the observed SBP. + """ + if scaled: + x = self.unscale_data(x) + x_2d = self.projector.project(x) + residuals = (x_2d - self.s) / self.s_err + return residuals + + def f_chisq(self, x, scaled=False): + """ + Function to calculate the chi-squared value of the current + solution with respect to the data. + """ + chisq = np.sum(self.f_residual(x, scaled=scaled) ** 2) + return chisq + + def f_constraint(self, x, scaled=False): + """ + Function to calculate the value of regularization constraint. + + References: ref.[1], eq.(3) + """ + if not scaled: + x = self.scale_data(x) + constraint = np.sum((x[:-1] + x[1:]) ** 2) + return constraint + + def main(): parser = argparse.ArgumentParser( description="Deproject the surface brightness profile (SBP)") |