#!/usr/bin/env python3 # # Aaron LI # Created: 2016-04-29 # Updated: 2016-06-25 # # Change log: # 2016-06-25: # * Use 'InterpolatedUnivariateSpline' instead of 'interp1d' # 2016-05-18: # * Roughly implement the PEI uncertainty estimation # * Fix/Update PEI Y positions determination # * Update `calc_pei()` results # 2016-05-04: # * Prepare the PEI uncertainty estimation support # # FIXME/TODO: # * improve the PEI uncertainty estimation approach (fix the bias) # """ Calculate the power excess index (PEI), which is defined the area ratio of the lower-left part with respect to the total rectangle, which is further defined by the radial power spectrum and the scale of 0.035R500 and 0.35R500, in the logarithmic space. Reference: Zhang, C., et al. 2016, ApJ """ import os import glob import argparse import json from collections import OrderedDict import numpy as np import scipy.interpolate as interpolate import scipy.integrate as integrate import matplotlib.pyplot as plt from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas from matplotlib.figure import Figure from matplotlib.path import Path import matplotlib.patches as patches from info import get_r500 __version__ = "0.5.0" __date__ = "2016-05-18" plt.style.use("ggplot") def calc_pei(data, r500, interp_np=101, pei_pos=None): """ Calculate the power excess index (PEI), which is defined the area ratio of the lower-left part with respect to the total rectangle. Arguments: * data: (frequency, power) power spectrum data * r500: R500 value in unit of the inverse of the above "frequency" * interp_np: number of data points interpolated to calculate PEI * pei_pos: position specification (i.e., xmin, xmax, ymin, ymax) of the PEI rectangle. If this argument is provided, then the PEI rectangle is just determined (i.e., r500 etc is ignored). This is a dictionary with the mentioned keys. e.g., the returned results of previous `calc_pei()`. """ freqs, psd1d = data # switch to the logarithmic scale mask = (freqs > 0.0) x = np.log10(freqs[mask]) y = np.log10(psd1d[mask]) # determine the X positions of PEI rectangle if pei_pos is not None: pei_xmin = pei_pos["pei_xmin"] pei_xmax = pei_pos["pei_xmax"] else: # frequency values corresponding to 0.35R500 and 0.035R500 pei_fmin = 1.0 / (0.350 * r500) pei_fmax = 1.0 / (0.035 * r500) pei_xmin = np.log10(pei_fmin) pei_xmax = np.log10(pei_fmax) # data points within the PEI range mask_pei = np.logical_and(x >= pei_xmin, x <= pei_xmax) y_pei = y[mask_pei] # interpolate the power spectrum by fitting a smoothing spline f_interp = interpolate.InterpolatedUnivariateSpline(x, y) x_interp = np.linspace(pei_xmin, pei_xmax, num=interp_np) y_interp = f_interp(x_interp) # determine the Y positions of PEI rectangle if pei_pos is not None: pei_ymin = pei_pos["pei_ymin"] pei_ymax = pei_pos["pei_ymax"] else: pei_ymin = min(np.min(y_interp), np.min(y_pei)) pei_ymax = max(np.max(y_interp), np.max(y_pei)) # # XXX/FIXME: # fixes the values that are smaller than the predefined `pei_ymin` # (due to large uncertainties of the PSD) # NOTE/FIXME: # Since the PEI rectangle is just *fixed* during the Monte Carlo error # estimation, and the values below the specified `pei_ymin` are just # ignored (i.e., force to be `pei_ymin`), therefore, the estimated error # of PEI is upwardly *biased*, i.e., the upper PEI uncertainty maybe # OK, but the lower PEI uncertainty is *underestimated*. # y_interp[y_interp < pei_ymin] = pei_ymin # calculate the PEI area_total = (pei_xmax - pei_xmin) * (pei_ymax - pei_ymin) area_below = integrate.trapz((y_interp-pei_ymin), x_interp) pei_value = area_below / area_total results = { "pei_xmin": pei_xmin, "pei_xmax": pei_xmax, "pei_ymin": pei_ymin, "pei_ymax": pei_ymax, "area_total": area_total, "area_below": area_below, "pei_value": pei_value, } data_interp_log10 = np.column_stack((x_interp, y_interp)) return (results, data_interp_log10) def estimate_pei_error(data, r500, pei_pos, mctimes=5000, verbose=False): """ Estimate the PEI error by Monte Carlo method. FIXME: * consider the R500 uncertainty """ eps = 1.0e-30 freqs = data[:, 0] psd1d = data[:, 1] psd1d_err = data[:, 2] psd1d_err[psd1d_err < eps] = eps r500, r500_err = r500 if verbose: print("Estimating PEI error by Monte Carlo (%d times) ..." % mctimes, end="", flush=True) pei_results = [] for i in range(mctimes): if verbose and i % 100 == 0: print("%d..." % i, end="", flush=True) psd1d_rand = np.random.normal(psd1d, psd1d_err) # Fix the values that are negative or too small psd1d_rand[psd1d_rand < eps] = eps # FIXME/XXX: how to consider the R500 uncertainty? # r500_rand = np.random.normal(r500, r500_err) pei, data_interp_log10 = calc_pei(data=(freqs, psd1d_rand), r500=r500, pei_pos=pei_pos) pei_results.append(pei["pei_value"]) if verbose: print("DONE!", flush=True) pei_mean = np.mean(pei_results) pei_std = np.std(pei_results) pei_q25, pei_median, pei_q75 = np.percentile(pei_results, q=(25, 50, 75)) results = { "pei_mean": pei_mean, "pei_std": pei_std, "pei_median": pei_median, "pei_q25": pei_q25, "pei_q75": pei_q75, } return results def plot_pei(data, data_interp_log10, info={}, ax=None, fig=None): """ Make a plot to visualize the PEI rectangular. """ if ax is None: fig, ax = plt.subplots(1, 1) # prepare data freqs = data[:, 0] psd1d = data[:, 1] psd1d_err = data[:, 2] x_interp = 10 ** data_interp_log10[:, 0] y_interp = 10 ** data_interp_log10[:, 1] pei_xmin = 10 ** info["pei_xmin"] pei_xmax = 10 ** info["pei_xmax"] pei_ymin = 10 ** info["pei_ymin"] pei_ymax = 10 ** info["pei_ymax"] # mask = (freqs > 0.0) xmin = np.min(freqs[mask]) / 1.2 xmax = np.max(freqs[mask]) ymin = np.min(psd1d[mask]) / 3.0 ymax = np.max(psd1d[mask] + psd1d_err[mask]) * 1.2 # ax.plot(freqs, psd1d, color="black", linestyle="none", marker="o", markersize=5, alpha=0.7) ax.errorbar(freqs, psd1d, yerr=psd1d_err, fmt="none", ecolor="blue", alpha=0.7) ax.set_xscale("log") ax.set_yscale("log") ax.set_xlim(xmin, xmax) ax.set_ylim(ymin, ymax) ax.set_title("Power Spectral Density & PEI (%s; %d)" % (info.get("name"), info.get("obsid"))) ax.set_xlabel(r"k (pixel$^{-1}$)") ax.set_ylabel("Power") ax.text(x=xmax/1.1, y=ymax/1.2, s="PEI = %.2f / %.2f = %.2f" % (info.get("area_below"), info.get("area_total"), info.get("pei")), horizontalalignment="right", verticalalignment="top") # plot the interpolated data points and the PEI rectangle # credit: http://matplotlib.org/users/path_tutorial.html ax.plot(x_interp, y_interp, linestyle="--", marker="D", markersize=2, color="green", alpha=0.9) vertices = [ (pei_xmin, pei_ymin), # left, bottom (pei_xmin, pei_ymax), # left, top (pei_xmax, pei_ymax), # right, top (pei_xmax, pei_ymin), # right, bottom (pei_xmin, pei_ymin), # ignored ] codes = [ Path.MOVETO, Path.LINETO, Path.LINETO, Path.LINETO, Path.CLOSEPOLY, ] path = Path(vertices, codes) patch = patches.PathPatch(path, fill=False, color="green", linewidth=2, alpha=0.9) ax.add_patch(patch) fig.tight_layout() return (fig, ax) def main(): # default arguments default_infile = "psd.txt" default_outfile = "pei.json" default_infojson = "../*_INFO.json" default_mctimes = 5000 parser = argparse.ArgumentParser( description="Calculate the power excess index (PEI)", epilog="Version: %s (%s)" % (__version__, __date__)) parser.add_argument("-V", "--version", action="version", version="%(prog)s " + "%s (%s)" % (__version__, __date__)) parser.add_argument("-v", "--verbose", dest="verbose", required=False, action="store_true", help="show verbose information") parser.add_argument("-m", "--mctimes", dest="mctimes", required=False, type=int, default=default_mctimes, help="number of MC times to estimate PEI error " + "(default: %d)" % default_mctimes) parser.add_argument("-j", "--json", dest="json", required=False, help="the *_INFO.json file " + "(default: find %s)" % default_infojson) parser.add_argument("-i", "--infile", dest="infile", required=False, default=default_infile, help="input data of the radial power spectrum " + "(default: %s)" % default_infile) parser.add_argument("-o", "--outfile", dest="outfile", required=False, default=default_outfile, help="output json file to save the results " + "(default: %s)" % default_outfile) parser.add_argument("-p", "--png", dest="png", default=None, help="make PEI plot and save " + "(default: same basename as outfile)") args = parser.parse_args() if args.png is None: args.png = os.path.splitext(args.outfile)[0] + ".png" info_json = glob.glob(default_infojson)[0] if args.json: info_json = args.json json_str = open(info_json).read().rstrip().rstrip(",") info = json.loads(json_str) name = info["Source Name"] obsid = int(info["Obs. ID"]) r500 = get_r500(info) r500_pix = r500["r500_pix"] r500_err_pix = (abs(r500["r500EL_pix"]) + abs(r500["r500EU_pix"])) / 2 psd_data = np.loadtxt(args.infile) freqs = psd_data[:, 0] psd1d = psd_data[:, 1] pei, data_interp_log10 = calc_pei(data=(freqs, psd1d), r500=r500_pix) pei_err = estimate_pei_error(psd_data, r500=(r500_pix, r500_err_pix), pei_pos=pei, mctimes=args.mctimes, verbose=args.verbose) pei_data = OrderedDict([ ("name", name), ("obsid", obsid), ("r500_pix", r500_pix), ("r500_err_pix", r500_err_pix), ("pei_xmin", pei["pei_xmin"]), ("pei_xmax", pei["pei_xmax"]), ("pei_ymin", pei["pei_ymin"]), ("pei_ymax", pei["pei_ymax"]), ("area_total", pei["area_total"]), ("area_below", pei["area_below"]), ("pei", pei["pei_value"]), ("pei_mean", pei_err["pei_mean"]), ("pei_std", pei_err["pei_std"]), ("pei_median", pei_err["pei_median"]), ("pei_q25", pei_err["pei_q25"]), ("pei_q75", pei_err["pei_q75"]), ("mc_times", args.mctimes), ]) pei_data_json = json.dumps(pei_data, indent=2) print(pei_data_json) open(args.outfile, "w").write(pei_data_json+"\n") # Make and save a plot fig = Figure(figsize=(10, 8)) FigureCanvas(fig) ax = fig.add_subplot(111) plot_pei(psd_data, data_interp_log10, info=pei_data, ax=ax, fig=fig) fig.savefig(args.png, format="png", dpi=150) if __name__ == "__main__": main() # vim: set ts=4 sw=4 tw=0 fenc=utf-8 ft=python: #