From 4c377ad4fbe98f85fcaedfc332da9c44fcecbc23 Mon Sep 17 00:00:00 2001 From: Aaron LI Date: Thu, 15 Nov 2018 23:46:26 +0800 Subject: Move fits/fits{cube,image}.py --- astro/fits/fitscube.py | 788 ------------------------------------------------ astro/fits/fitsimage.py | 522 -------------------------------- astro/fitscube.py | 788 ++++++++++++++++++++++++++++++++++++++++++++++++ astro/fitsimage.py | 522 ++++++++++++++++++++++++++++++++ 4 files changed, 1310 insertions(+), 1310 deletions(-) delete mode 100755 astro/fits/fitscube.py delete mode 100755 astro/fits/fitsimage.py create mode 100755 astro/fitscube.py create mode 100755 astro/fitsimage.py diff --git a/astro/fits/fitscube.py b/astro/fits/fitscube.py deleted file mode 100755 index 404425d..0000000 --- a/astro/fits/fitscube.py +++ /dev/null @@ -1,788 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2017-2018 Weitian LI -# MIT license -# - -""" -FITS image cube manipulation tool. - -This tool was originally developed to create a FITS image cube from a -series of CT scan slices to help better visualize/examine them in the -sophisticated SAOImage DS9 software. Each slice in the cube is a CT -image at a position from the CT scan, with the z-axis tracking the slice -positions (equal-distant) in units of, e.g., [cm]. - -Then this tool was significantly improved to deal with the spectral cube -in radio astronomy, with each slice representing the radio sky at a -certain frequency (channel), so the z-axis records the frequency in -units of [Hz]. - -For example, we simulate the observed image using OSKAR and WSClean one -frequency channel at a time, then use this tool to combine them into -a spectral cube, from which the 2D and 1D power spectra is derived. - -The ``calibrate`` sub-command is used to calibrate the frequency channel -responses to make them spectrally smooth by fitting a low-order polynomial. - -The ``corrupt`` sub-command is used to corrupt the frequency channel -responses to simulate that real instrument suffers from calibration -imperfections. -""" - -import os -import sys -import argparse -from datetime import datetime, timezone -from functools import lru_cache - -import numpy as np -from astropy.io import fits -from astropy.wcs import WCS - - -class FITSCube: - """ - FITS image cube. - """ - def __init__(self, infile=None): - if infile is not None: - self.load(infile) - - def load(self, infile): - with fits.open(infile) as f: - self.data = f[0].data - self.header = f[0].header - print("Loaded FITS cube from file: %s" % infile) - print("Cube dimensions: %dx%dx%d" % - (self.width, self.height, self.nslice)) - # The Z-axis position of the first slice. - self.zbegin = self.header["CRVAL3"] - # The Z-axis step/spacing between slices. - self.zstep = self.header["CDELT3"] - - def add_slices(self, infiles, zbegin=0.0, zstep=1.0): - """ - Create a FITS cube from input image slices. - """ - self.infiles = infiles - self.zbegin = zbegin - self.zstep = zstep - nslice = len(infiles) - header, image = self.open_image(infiles[0]) - shape = (nslice, ) + image.shape - data = np.zeros(shape, dtype=image.dtype) - for i, fn in enumerate(infiles): - print("[%d/%d] Adding image slice: %s ..." % (i+1, nslice, fn)) - hdr, img = self.open_image(fn) - data[i, :, :] = img - self.data = data - self.header = header.copy(strip=True) - print("Created FITS cube of dimensions: %dx%dx%d" % - (self.width, self.height, self.nslice)) - - @staticmethod - def open_image(infile): - """ - Open the slice image and return its header and 2D image data. - - NOTE - ---- - The input slice image may have following dimensions: - * NAXIS=2: [Y, X] - * NAXIS=3: [FREQ=1, Y, X] - * NAXIS=4: [STOKES=1, FREQ=1, Y, X] - - NOTE - ---- - Only open slice image that has only ONE frequency and ONE Stokes - parameter. - - Returns - ------- - header : `~astropy.io.fits.Header` - image : 2D `~numpy.ndarray` - The 2D [Y, X] image part of the slice image. - """ - with fits.open(infile) as f: - header = f[0].header - data = f[0].data - if data.ndim == 2: - # NAXIS=2: [Y, X] - image = data - elif data.ndim == 3 and data.shape[0] == 1: - # NAXIS=3: [FREQ=1, Y, X] - image = data[0, :, :] - elif data.ndim == 4 and data.shape[0] == 1 and data.shape[1] == 1: - # NAXIS=4: [STOKES=1, FREQ=1, Y, X] - image = data[0, 0, :, :] - else: - raise ValueError("Slice '{0}' has invalid dimensions: {1}".format( - infile, data.shape)) - return (header, image) - - @property - def header(self): - if not hasattr(self, "header_"): - self.header_ = fits.Header() - return self.header_ - - @header.setter - def header(self, value): - self.header_ = value - for key in ["CTYPE4", "CRPIX4", "CRVAL4", "CDELT4", "CUNIT4"]: - try: - del self.header_[key] - except KeyError: - pass - - @property - @lru_cache() - def wcs(self): - w = WCS(naxis=3) - w.wcs.ctype = ["pixel", "pixel", "pixel"] - w.wcs.crpix = np.array([self.header.get("CRPIX1", 1.0), - self.header.get("CRPIX2", 1.0), - 1.0]) - w.wcs.crval = np.array([self.header.get("CRVAL1", 0.0), - self.header.get("CRVAL2", 0.0), - self.zbegin]) - w.wcs.cdelt = np.array([self.header.get("CDELT1", 1.0), - self.header.get("CDELT2", 1.0), - self.zstep]) - return w - - def keyword(self, key, value=None, comment=None): - header = self.header - if value is None: - return header[key] - else: - header[key] = (value, comment) - - def write(self, outfile, clobber=False): - header = self.header - header.extend(self.wcs.to_header(), update=True) - header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), - "File creation date") - header.add_history(" ".join(sys.argv)) - hdu = fits.PrimaryHDU(data=self.data, header=header) - try: - hdu.writeto(outfile, overwrite=clobber) - except TypeError: - hdu.writeto(outfile, clobber=clobber) - - @property - def width(self): - __, __, w = self.data.shape - return w - - @property - def height(self): - __, h, __ = self.data.shape - return h - - @property - def nslice(self): - ns, __, __ = self.data.shape - return ns - - @property - @lru_cache() - def zvalues(self): - """ - Calculate the Z-axis positions for all slices - """ - nslice = self.nslice - wcs = self.wcs - pix = np.zeros(shape=(nslice, 3), dtype=int) - pix[:, 2] = np.arange(nslice) - world = wcs.wcs_pix2world(pix, 0) - return world[:, 2] - - @property - def slices(self): - """ - A list of slices in the cube w.r.t. ``zvalues``. - """ - return (self.data[i, :, :] for i in range(self.nslice)) - - def get_slice(self, i, csize=None): - """ - Get the i-th (0-based) slice image, and crop out the central box - of size ``csize`` if specified. - """ - if csize is None: - return self.data[i, :, :] - else: - rows, cols = self.height, self.width - rc, cc = rows//2, cols//2 - cs1, cs2 = csize//2, (csize+1)//2 - return self.data[i, (rc-cs1):(rc+cs2), (cc-cs1):(cc+cs2)] - - def apply_gain(self, gain): - """ - Multiply the supplied ``gain`` to each slice, to achieve slice - or channel response calibration or corruption. - """ - gain = np.asarray(gain) - self.data *= gain[:, np.newaxis, np.newaxis] - - def pool(self, blocksize, func=np.mean): - """ - Down-sampling the images by pooling - """ - try: - from skimage.measure import block_reduce - except ImportError: - print("scikit-image not installed") - raise - - self.data = block_reduce(self.data, - block_size=(1, blocksize, blocksize), - func=func) - self.keyword(key="POOL_BS", value=blocksize, - comment="down-sampling block size") - self.keyword(key="POOL_FUN", value=func.__name__, - comment="down-sampling function/method") - - @property - def unit(self): - """ - Cube data unit. - """ - return self.header.get("BUNIT") - - @unit.setter - def unit(self, value): - self.header["BUNIT"] = value - - @property - def zunit(self): - """ - Unit of the slice z-axis positions. - """ - return self.header.get("CUNIT3") - - @zunit.setter - def zunit(self, value): - self.header["CUNIT3"] = value - - -def cmd_info(args): - """ - Sub-command: "info", show FITS cube information - """ - cube = FITSCube(args.infile) - if cube.zunit: - pzunit = " [%s]" % cube.zunit - else: - pzunit = "" - zvalues = cube.zvalues - nslice = cube.nslice - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % nslice) - print("Slice step/spacing: %s%s" % (cube.zstep, pzunit)) - print("Slice positions: %s <-> %s%s" % - (zvalues.min(), zvalues.max(), pzunit)) - if args.stats: - mean = np.zeros(nslice) - std = np.zeros(nslice) - rms = np.zeros(nslice) - for i in range(nslice): - image = cube.get_slice(i, csize=args.center) - if args.abs: - image = np.abs(image) - mean[i] = np.mean(image) - std[i] = np.std(image) - rms[i] = np.sqrt(np.mean(image**2)) - print("Slice ") - for i, z in enumerate(zvalues): - print("* %12.4e: %12.4e %12.4e %12.4e" % - (z, mean[i], std[i], rms[i])) - if args.outfile: - data = np.column_stack([zvalues, mean, std, rms]) - np.savetxt(args.outfile, data, header="z mean std rms") - print("Saved statistics data to file: %s" % args.outfile) - - -def cmd_create(args): - """ - Sub-command: "create", create a FITS cube - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - cube = FITSCube() - cube.add_slices(args.infiles, zbegin=args.zbegin, zstep=args.zstep) - cube.zunit = args.zunit - if args.unit: - cube.unit = args.unit - cube.write(args.outfile, clobber=args.clobber) - print("Created FITS cube: %s" % args.outfile) - - -def cmd_crop(args): - """ - Sub-command: "crop", crop the central region of a FITS cube - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Cropping region size: %dx%d" % (args.size, args.size)) - s_width = slice((cube.width - args.size) // 2, - (cube.width + args.size) // 2) - s_height = slice((cube.height - args.size) // 2, - (cube.height + args.size) // 2) - cube.data = cube.data[:, s_height, s_width] - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Created FITS cube: %s" % args.outfile) - - -def cmd_add(args): - """ - Sub-command: "add", add two or more FITS cubes - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - if len(args.infiles) < 2: - raise RuntimeError("Two or more input FITS cubes required") - - cube = FITSCube(args.infiles[0]) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - for f in args.infiles[1:]: - cube2 = FITSCube(f) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Adding cube %s ..." % f) - cube.data = cube.data + cube2.data - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_mul(args): - """ - Sub-command: "mul", multiply two or more FITS cubes - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - if len(args.infiles) < 2: - raise RuntimeError("Two or more input FITS cubes required") - - cube = FITSCube(args.infiles[0]) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - for f in args.infiles[1:]: - cube2 = FITSCube(f) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Multiplying cube %s ..." % f) - cube.data = cube.data * cube2.data - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_sub(args): - """ - Sub-command: "sub", subtract one FITS cube by another one - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - cube2 = FITSCube(args.infile2) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Subtracting cube %s ..." % args.infile2) - cube.data = cube.data - cube2.data - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_div(args): - """ - Sub-command: "div", divide one FITS cube by another one - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - cube2 = FITSCube(args.infile2) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Dividing cube %s ..." % args.infile2) - with np.errstate(divide='warn'): - cube.data = cube.data / cube2.data - - if args.fill_value: - print("Filling invalid data with: %s" % args.fill_value) - cube.data[~np.isfinite(cube.data)] = float(args.fill_value) - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_calibrate(args): - """ - Sub-command: "calibrate", calibrate the z-axis slice/channel responses - by fitting a polynomial. - """ - if not args.dryrun: - if args.outfile is None: - raise ValueError("--outfile required") - elif not args.clobber and os.path.exists(args.outfile): - raise OSError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - zvalues = cube.zvalues - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - mean = np.zeros(cube.nslice) - std = np.zeros(cube.nslice) - for i in range(cube.nslice): - image = cube.get_slice(i, csize=args.center) - if args.abs: - image = np.abs(image) - threshold = np.percentile(image, q=100*args.threshold) - data = image[image >= threshold] - mean[i] = np.mean(data) - std[i] = np.std(data) - print("Fitting polynomial order: %d" % args.poly_order) - weights = 1.0 / std - pfit = np.polyfit(zvalues, mean, w=weights, deg=args.poly_order) - mean_new = np.polyval(pfit, zvalues) - coef = mean_new / mean - - if args.dryrun: - print("*** DRY RUN MODE ***") - else: - print("Applying slice/channel calibration gains ...") - cube.apply_gain(coef) - print("Saving calibrated FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Calibrated FITS cube wrote to: %s" % args.outfile) - - print("Slice +/- " + - " ") - for i, z in enumerate(zvalues): - print("* %12.4e: %12.4e %12.4e %12.4e %.6f" % - (z, mean[i], std[i], mean_new[i], coef[i])) - - if args.save_info: - data = np.column_stack([zvalues, mean, std, mean_new, coef]) - header = [ - "Arguments:", - "+ center: %s" % args.center, - "+ abs: %s" % args.abs, - "+ threshold (percentile): %.2f" % args.threshold, - "+ polynomial_order: %d" % args.poly_order, - "", - "Columns:", - "1. z/frequency: z-axis position / frequency [%s]" % cube.zunit, - "2. mean.old: mean before calibration [%s]" % cube.unit, - "3. std.old: standard deviation before calibration", - "4. mean.new: mean after calibration", - "5. gain_coef: calibration coefficient", - "", - ] - infofile = os.path.splitext(args.outfile)[0] + ".txt" - np.savetxt(infofile, data, header="\n".join(header)) - print("Saved calibration information to file: %s" % infofile) - - -def cmd_corrupt(args): - """ - Sub-command: "corrupt", corrupt z-axis slice/channel responses by - applying random gain coefficients. - """ - if not args.clobber and os.path.exists(args.outfile): - raise OSError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - zvalues = cube.zvalues - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - if args.gaus_sigma is not None: - print("Gaussian sigma: %.1f%%" % args.gaus_sigma) - sigma = args.gaus_sigma * 0.01 - gains = np.random.normal(loc=0.0, scale=sigma, size=cube.nslice) - idx_outliers = np.abs(gains) > 3*sigma - gains[idx_outliers] = np.sign(gains[idx_outliers]) * (3*sigma) - gains += 1.0 - else: - print("Use corruption information from file: %s" % args.infofile) - args.save_info = False # ``--info-file`` discards ``--save-info`` - crpdata = np.loadtxt(args.infofile) - gains = crpdata[:, 1] - - print("Applying slice/channel corruptions ...") - cube.apply_gain(gains) - print("Saving corrupted FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Corrupted FITS cube wrote to: %s" % args.outfile) - - print("Slice ") - for z, g in zip(zvalues, gains): - print("* %12.4e: %.6f" % (z, g)) - if args.save_info: - data = np.column_stack([zvalues, gains]) - header = [ - "Arguments:", - "+ gaus_sigma: %.1f%%" % args.gaus_sigma, - "", - "Columns:", - "1. z/frequency: z-axis position / frequency [%s]" % cube.zunit, - "2. gain_corruption: corruption coefficient", - "", - ] - infofile = os.path.splitext(args.outfile)[0] + ".txt" - np.savetxt(infofile, data, header="\n".join(header)) - print("Saved corruption information to file: %s" % infofile) - - -def cmd_pool(args): - """ - Sub-command: "pool", down-sample image cube along the spatial dimension. - """ - if not args.clobber and os.path.exists(args.outfile): - raise OSError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - print("Pooling image cube ...") - print("block size: %d, method: %s" % (args.blocksize, args.method)) - cube.pool(blocksize=args.blocksize, func=getattr(np, args.method)) - print("Pooled image/slice size: %dx%d" % (cube.width, cube.height)) - print("Saving pooled FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Pooled FITS cube wrote to: %s" % args.outfile) - - -def main(): - parser = argparse.ArgumentParser( - description="FITS image cube manipulation tool") - subparsers = parser.add_subparsers(dest="subparser_name", - title="sub-commands", - help="additional help") - - # sub-command: "info" - parser_info = subparsers.add_parser("info", help="show FITS cube info") - parser_info.add_argument("-a", "--abs", action="store_true", - help="take absolute values for image pixels") - parser_info.add_argument("-c", "--center", type=int, - help="crop the central box region of specified " + - "size to calculate the statistics") - parser_info.add_argument("-s", "--stats", action="store_true", - help="calculate statistics (mean, std, rms) " - "for each slice") - parser_info.add_argument("-o", "--outfile", - help="outfile to save the statistics") - parser_info.add_argument("infile", help="FITS cube filename") - parser_info.set_defaults(func=cmd_info) - - # sub-command: "create" - parser_create = subparsers.add_parser( - "create", aliases=["new"], - help="create a FITS cube from a series of images/slices") - parser_create.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_create.add_argument("-U", "--data-unit", dest="unit", - help="cube data unit (will overwrite the " + - "slice data unit)") - parser_create.add_argument("-z", "--z-begin", dest="zbegin", - type=float, default=0.0, - help="Z-axis position of the first slice") - parser_create.add_argument("-s", "--z-step", dest="zstep", - type=float, default=1.0, - help="Z-axis step/spacing between slices") - parser_create.add_argument("-u", "--z-unit", dest="zunit", - help="Z-axis unit (e.g., cm, Hz)") - parser_create.add_argument("-o", "--outfile", dest="outfile", - required=True, - help="output FITS cube filename") - parser_create.add_argument("-i", "--infiles", dest="infiles", - nargs="+", required=True, - help="input image slices (in order)") - parser_create.set_defaults(func=cmd_create) - - # sub-command: "crop" - parser_crop = subparsers.add_parser( - "crop", - help="crop the central spatial region of the FITS cube") - parser_crop.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_crop.add_argument("-n", "--size", type=int, required=True, - help="crop region size (number of pixels)") - parser_crop.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_crop.add_argument("-i", "--infile", required=True, - help="input FITS cube") - parser_crop.set_defaults(func=cmd_crop) - - # sub-command: "add" - parser_add = subparsers.add_parser( - "add", - help="add two or more FITS cubes") - parser_add.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_add.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_add.add_argument("-i", "--infiles", nargs="+", required=True, - help="two or more input FITS cubes") - parser_add.set_defaults(func=cmd_add) - - # sub-command: "multiply" - parser_mul = subparsers.add_parser( - "mul", aliases=["multiply"], - help="multiply one FITS cube by another one") - parser_mul.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_mul.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_mul.add_argument("-i", "--infiles", nargs="+", required=True, - help="two or more input FITS cubes") - parser_mul.set_defaults(func=cmd_mul) - - # sub-command: "subtract" - parser_sub = subparsers.add_parser( - "sub", aliases=["subtract"], - help="subtract one FITS cube by another one") - parser_sub.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_sub.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_sub.add_argument("-i", "--infile", required=True, - help="input FITS cube as the minuend") - parser_sub.add_argument("-I", "--infile2", required=True, - help="another input FITS cube as the subtrahend") - parser_sub.set_defaults(func=cmd_sub) - - # sub-command: "divide" - parser_div = subparsers.add_parser( - "div", aliases=["divide"], - help="divide one FITS cube by another one") - parser_div.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_div.add_argument("-F", "--fill-value", dest="fill_value", - help="value to fill the invalid elements") - parser_div.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_div.add_argument("-i", "--infile", required=True, - help="input FITS cube as the dividend") - parser_div.add_argument("-I", "--infile2", required=True, - help="another input FITS cube as the divisor") - parser_div.set_defaults(func=cmd_div) - - # sub-command: "calibrate" - parser_cal = subparsers.add_parser( - "cal", aliases=["calibrate"], - help="calibrate z-axis slice/channel responses by fitting " + - "a polynomial") - parser_cal.add_argument("-n", "--dry-run", dest="dryrun", - action="store_true", - help="dry run mode") - parser_cal.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_cal.add_argument("-c", "--center", dest="center", type=int, - help="crop the central box region of specified " + - "size to calculate the mean/std.") - parser_cal.add_argument("-t", "--threshold", dest="threshold", - type=float, default=0.0, - help="percentile threshold (0 -> 1) and only " + - "considers image pixels with values > threshold " + - "to determine the channel/slice responses; " + - "(default: 0, i.e., all pixels are accounted for)") - parser_cal.add_argument("-a", "--abs", dest="abs", action="store_true", - help="take absolute values for image pixels") - parser_cal.add_argument("-p", "--poly-order", dest="poly_order", - type=int, default=2, - help="order of polynomial used for fitting " + - "(default: 2, i.e., quadratic)") - parser_cal.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS cube filename") - parser_cal.add_argument("-o", "--outfile", dest="outfile", - help="output calibrated FITS cube (optional " + - "for dry-run model)") - parser_cal.add_argument("--save-info", dest="save_info", - action="store_true", - help="save the calibration information of each " + - "channel/slice to a text file") - parser_cal.set_defaults(func=cmd_calibrate) - - # sub-command: "corrupt" - parser_crp = subparsers.add_parser( - "crp", aliases=["corrupt"], - help="corrupt z-axis slice/channel responses by applying " + - "random gain coefficients") - exgrp_crp = parser_crp.add_mutually_exclusive_group(required=True) - exgrp_crp.add_argument("-G", "--gaus-sigma", dest="gaus_sigma", type=float, - help="Gaussian sigma in percent from which " + - "random gain coefficients are sampled; " + - "specified in percent (e.g., 1 for 1%%)") - exgrp_crp.add_argument("-I", "--info-file", dest="infofile", - help="use the gain coefficients from a " + - "(previously saved) corruption information " + - "file; will also discard argument --save-info") - parser_crp.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_crp.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS cube filename") - parser_crp.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output corrupted FITS cube") - parser_crp.add_argument("--save-info", dest="save_info", - action="store_true", - help="save the corruption information of each " + - "channel/slice to a text file") - parser_crp.set_defaults(func=cmd_corrupt) - - # sub-command: "pool" - parser_pool = subparsers.add_parser( - "pool", - help="down-sample image cube along the spatial dimensions") - parser_pool.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_pool.add_argument("-i", "--infile", required=True, - help="input FITS cube filename") - parser_pool.add_argument("-o", "--outfile", required=True, - help="output pooled FITS cube") - parser_pool.add_argument("-n", "--block-size", dest="blocksize", - type=int, required=True, - help="down-sampling block size (i.e., factor)") - parser_pool.add_argument("-m", "--method", default="mean", - choices=["mean", "min", "max"], - help="down-sampling method (default: mean)") - parser_pool.set_defaults(func=cmd_pool) - - args = parser.parse_args() - args.func(args) - - -if __name__ == "__main__": - main() diff --git a/astro/fits/fitsimage.py b/astro/fits/fitsimage.py deleted file mode 100755 index af49f7e..0000000 --- a/astro/fits/fitsimage.py +++ /dev/null @@ -1,522 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2017-2018 Weitian LI -# MIT license -# - -""" -FITS image manipulate tool. -""" - -import sys -import argparse - -import numpy as np -from astropy.io import fits -from scipy import ndimage - - -class FITSImage: - """ - FITS image class that deals with plain 2D image (NAXIS=2), but also - handles single-frequency single-polarized image cube (NAXIS=3, 4), - e.g., created by WSClean. - """ - def __init__(self, infile, pixelsize=None): - self.infile = infile - with fits.open(infile) as f: - self.header = f[0].header.copy(strip=True) - self.data = f[0].data - self.ndim = self.data.ndim - self.shape = self.data.shape - if pixelsize is not None: - self.pixelsize = pixelsize # [arcsec] - - @property - def bunit(self): - return self.header.get("BUNIT") - - @property - def Nx(self): - """ - Number of pixels along the X axis (i.e., image width) - """ - return self.shape[-1] - - @property - def Ny(self): - """ - Number of pixels along the Y axis (i.e., image height) - """ - return self.shape[-2] - - @property - def image(self): - """ - Deal with single-frequency and single-polarized image cube. - """ - if self.ndim == 2: - # NAXIS=2: [Y, X] - image = self.data[:, :].copy() - elif self.ndim == 3 and self.shape[0] == 1: - # NAXIS=3: [FREQ=1, Y, X] - image = self.data[0, :, :].copy() - elif self.ndim == 4 and self.shape[0] == 1 and self.shape[1] == 1: - # NAXIS=4: [STOKES=1, FREQ=1, Y, X] - image = self.data[0, 0, :, :].copy() - else: - raise ValueError("invalid data shape: {1}".format(self.shape)) - return image - - @image.setter - def image(self, value): - if self.ndim == 2: - # NAXIS=2: [Y, X] - self.data = np.array(value) - elif self.ndim == 3: - # NAXIS=3: [FREQ=1, Y, X] - self.data = np.array(value)[np.newaxis, :, :] - else: - # NAXIS=4: [STOKES=1, FREQ=1, Y, X] - self.data = np.array(value)[np.newaxis, np.newaxis, :, :] - - @property - def pixelsize(self): - """ - Image pixel size, in units of [arcsec] - """ - if hasattr(self, "_pixelsize"): - return self._pixelsize - - try: - return self.header["PixSize"] # [arcsec] - except KeyError: - try: - return abs(self.header["CDELT1"]) * 3600 # [deg] -> [arcsec] - except KeyError: - return None - - @pixelsize.setter - def pixelsize(self, value): - # Unit: [arcsec] - oldvalue = self.pixelsize - self._pixelsize = value - # Update header - self.header["PixSize"] = value # [arcsec] - try: - self.header["CDELT1"] *= value / oldvalue - self.header["CDELT2"] *= value / oldvalue - except KeyError: - pass - - @property - def fov(self): - """ - Image FoV coverage: (fov_x, fov_y) - Unit: [deg] - """ - pixelsize = self.pixelsize - if pixelsize: - return (self.Nx*pixelsize/3600, self.Ny*pixelsize/3600) - else: - return None - - def zoom(self, newsize, order=1): - """ - Zoom the image to the specified ``newsize``, meanwhile the header - information will be updated accordingly to preserve the FoV coverage. - - NOTE - ---- - The image aspect ratio cannot be changed. - - Parameters - ---------- - newsize : (Nx, Ny) or N - The size of the zoomed image. - order : int, optional - The interpolation order, default: 1 - """ - try: - Nx2, Ny2 = newsize - except TypeError: - Nx2 = Ny2 = newsize - zoom = ((Ny2+0.1)/self.Ny, (Nx2+0.1)/self.Nx) - if abs(zoom[0] - zoom[1]) > 1e-3: - raise RuntimeError("image aspect ratio cannot be changed") - - pixelsize_old = self.pixelsize - self.image = ndimage.zoom(self.image, zoom=zoom, order=order) - self.pixelsize = pixelsize_old * (self.Nx / Nx2) - return self.image - - def flip(self, direction): - if direction == "lr": - self.image = np.fliplr(self.image) - elif direction == "ud": - self.image = np.flipud(self.image) - else: - raise ValueError("invalid flip direction: %s" % direction) - return self.image - - def rotate(self, to): - if to == "left": - self.image = np.rot90(self.image, k=-1) - elif to == "right": - self.image = np.rot90(self.image, k=1) - elif to == "180": - self.image = np.rot90(self.image, k=2) - else: - raise ValueError("invalid rotate to: %s" % to) - return self.image - - def write(self, outfile, clobber=False): - self.header.add_history(" ".join(sys.argv)) - hdu = fits.PrimaryHDU(data=self.data, header=self.header) - try: - hdu.writeto(outfile, overwrite=clobber) - except TypeError: - hdu.writeto(outfile, clobber=clobber) - - -def show_info(filename, abs_=None, center=None): - """ - Show FITS image information. - """ - fimage = FITSImage(filename) - print("Image data shape: {0}".format(fimage.shape)) - print("Image size: %dx%d" % (fimage.Nx, fimage.Ny)) - print("Data unit: [%s]" % fimage.bunit) - pixelsize = fimage.pixelsize - if pixelsize: - print("Pixel size: %.1f [arcsec]" % pixelsize) - print("Field of view: (%.2f, %.2f) [deg]" % fimage.fov) - data = fimage.image - if abs_: - data = np.abs(data) - if center: - print("Central box size: %d" % center) - rows, cols = data.shape - rc, cc = rows//2, cols//2 - cs1, cs2 = center//2, (center+1)//2 - data = data[(rc-cs1):(rc+cs2), (cc-cs1):(cc+cs2)] - min_ = np.nanmin(data) - max_ = np.nanmax(data) - mean = np.nanmean(data) - median = np.nanmedian(data) - std = np.nanstd(data) - iqr = np.diff(np.nanpercentile(data, q=(25, 75))) - mad = np.nanmedian(np.abs(data - median)) - rms = np.sqrt(np.nanmean(data**2)) - print("min: %13.6e" % min_) - print("max: %13.6e" % max_) - print("range: %13.6e (max - min)" % (max_ - min_)) - print("mean: %13.6e" % mean) - print("median: %13.6e" % median) - print("std: %13.6e (standard deviation)" % std) - print("iqr: %13.6e (interquartile range)" % iqr) - print("mad: %13.6e (median absolute deviation)" % mad) - print("rms: %13.6e (root-mean-squared)" % rms) - - -def cmd_info(args): - """ - Sub-command: "info", show FITS image information - """ - for fn in args.files: - print(">>> %s <<<" % fn) - show_info(fn, abs_=args.abs, center=args.center) - print("") - - -def cmd_add(args): - """ - Sub-command: "add", add the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Add by number: %g" % args.number) - image += args.number - else: - for fn in args.files: - print("Add by another image from: %s" % fn) - fimage2 = FITSImage(fn) - image += fimage2.image - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_sub(args): - """ - Sub-command: "sub", subtract the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Subtract by number: %g" % args.number) - image -= args.number - else: - for fn in args.files: - print("Subtract by another image from: %s" % fn) - fimage2 = FITSImage(fn) - image -= fimage2.image - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_mul(args): - """ - Sub-command: "mul", multiply the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Multiply by number: %g" % args.number) - image *= args.number - else: - for fn in args.files: - print("Multiply by another image from: %s" % fn) - fimage2 = FITSImage(fn) - image *= fimage2.image - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_div(args): - """ - Sub-command: "div", divide the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Divide by number: %g" % args.number) - image /= args.number - else: - for fn in args.files: - print("Divide by another image from: %s" % fn) - fimage2 = FITSImage(fn) - with np.errstate(divide="warn"): - image /= fimage2.image - - if args.fill_value: - print("Filling invalid data with: %s" % args.fill_value) - image[~np.isfinite(image)] = float(args.fill_value) - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_zoom(args): - """ - Sub-command: "zoom", zoom the image to a new size with FoV coverage - preserved. - """ - fimage = FITSImage(args.infile) - print("Image size: %dx%d" % (fimage.Nx, fimage.Ny)) - pixelsize = fimage.pixelsize - if pixelsize is None: - raise RuntimeError("--pixelsize required") - else: - print("Pixel size: %.1f [arcsec]" % pixelsize) - print("Field of view: (%.2f, %.2f) [deg]" % fimage.fov) - - print("Zooming image ...") - print("Interpolation order: %d" % args.order) - print("Zoomed image size: %dx%d" % (args.size, args.size)) - fimage.zoom(newsize=args.size, order=args.order) - print("Zoomed image pixel size: %.1f [arcsec]" % fimage.pixelsize) - fimage.write(args.outfile, clobber=args.clobber) - print("Saved zoomed FITS image to: %s" % args.outfile) - - -def cmd_flip(args): - """ - Sub-command: "flip", flip the image left-right or up-down. - """ - fimage = FITSImage(args.infile) - print("Flipping image ...") - direction = "lr" if args.lr else "ud" - print("Flip direction: %s" % direction) - fimage.flip(direction) - fimage.write(args.outfile, clobber=args.clobber) - print("Saved flipped FITS image to: %s" % args.outfile) - - -def cmd_rotate(args): - """ - Sub-command: "rotate", rotate the image. - """ - fimage = FITSImage(args.infile) - print("Rotating image ...") - if args.left: - to = "left" - elif args.right: - to = "right" - else: - to = "180" - print("Rotate to: %s" % to) - fimage.rotate(to) - fimage.write(args.outfile, clobber=args.clobber) - print("Saved rotated FITS image to: %s" % args.outfile) - - -def main(): - parser = argparse.ArgumentParser( - description="FITS image manipulation tool") - subparsers = parser.add_subparsers(dest="subparser_name", - title="sub-commands", - help="additional help") - - # sub-command: "info" - parser_info = subparsers.add_parser( - "info", aliases=["show"], - help="show FITS image info") - parser_info.add_argument("-c", "--center", dest="center", type=int, - help="choose central region of specified size") - parser_info.add_argument("-a", "--abs", dest="abs", action="store_true", - help="take absolute values of image pixels") - parser_info.add_argument("files", nargs="+", help="FITS image filename") - parser_info.set_defaults(func=cmd_info) - - # sub-command: "add" - parser_add = subparsers.add_parser( - "add", - help="add the image by a number or other image(s)") - parser_add.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_add.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_add.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_add = parser_add.add_mutually_exclusive_group(required=True) - exgrp_add.add_argument("-n", "--number", dest="number", type=float, - help="number to be added by") - exgrp_add.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be added by") - parser_add.set_defaults(func=cmd_add) - - # sub-command: "sub" - parser_sub = subparsers.add_parser( - "sub", aliases=["subtract"], - help="subtract the image by a number or other image(s)") - parser_sub.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_sub.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_sub.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_sub = parser_sub.add_mutually_exclusive_group(required=True) - exgrp_sub.add_argument("-n", "--number", dest="number", type=float, - help="number to be subtracted by") - exgrp_sub.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be subtracted by") - parser_sub.set_defaults(func=cmd_sub) - - # sub-command: "mul" - parser_mul = subparsers.add_parser( - "mul", aliases=["multiply"], - help="multiply the image by a number or other image(s)") - parser_mul.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_mul.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_mul.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_mul = parser_mul.add_mutually_exclusive_group(required=True) - exgrp_mul.add_argument("-n", "--number", dest="number", type=float, - help="number to be multiplied by") - exgrp_mul.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be multiplied by") - parser_mul.set_defaults(func=cmd_mul) - - # sub-command: "div" - parser_div = subparsers.add_parser( - "div", aliases=["divide"], - help="divide the image by a number or other image(s)") - parser_div.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_div.add_argument("-F", "--fill-value", dest="fill_value", - help="value to fill the invalid elements") - parser_div.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_div.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_div = parser_div.add_mutually_exclusive_group(required=True) - exgrp_div.add_argument("-n", "--number", dest="number", type=float, - help="number to be divided by") - exgrp_div.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be divided by") - parser_div.set_defaults(func=cmd_div) - - # sub-command: "zoom" - parser_zoom = subparsers.add_parser( - "zoom", aliases=["rescale"], - help="zoom the image to a new size with FoV coverage preserved") - parser_zoom.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_zoom.add_argument("--order", type=int, default=1, - help="zoom interpolation order (default: 1)") - parser_zoom.add_argument("-s", "--size", type=int, required=True, - help="zoomed image size (number of pixels)") - parser_zoom.add_argument("-p", "--pixelsize", type=float, - help="input FITS image pixel size [arcsec] " + - "(default: try to obtain from FITS header)") - parser_zoom.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_zoom.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output zoomed FITS image") - parser_zoom.set_defaults(func=cmd_zoom) - - # sub-command: "flip" - parser_flip = subparsers.add_parser( - "flip", help="flip the image left-right or up-down") - parser_flip.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_flip.add_argument("-i", "--infile", required=True, - help="input FITS image") - parser_flip.add_argument("-o", "--outfile", required=True, - help="output flipped FITS image") - exgrp_flip = parser_flip.add_mutually_exclusive_group(required=True) - exgrp_flip.add_argument("-l", "--left-right", dest="lr", - action="store_true", - help="flip in the left/right direction") - exgrp_flip.add_argument("-u", "--up-down", dest="ud", - action="store_true", - help="flip in the left/right direction") - parser_flip.set_defaults(func=cmd_flip) - - # sub-command: "rotate" - parser_rot = subparsers.add_parser( - "rot", aliases=["rotate"], - help="rotate the image") - parser_rot.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_rot.add_argument("-i", "--infile", required=True, - help="input FITS image") - parser_rot.add_argument("-o", "--outfile", required=True, - help="output rotated FITS image") - exgrp_rot = parser_rot.add_mutually_exclusive_group(required=True) - exgrp_rot.add_argument("-l", "--left", action="store_true", - help="rotate left") - exgrp_rot.add_argument("-r", "--right", action="store_true", - help="rotate right") - exgrp_rot.add_argument("-u", "--180", dest="ud", - action="store_true", - help="rotate 180 degree") - parser_rot.set_defaults(func=cmd_rotate) - - args = parser.parse_args() - args.func(args) - - -if __name__ == "__main__": - main() diff --git a/astro/fitscube.py b/astro/fitscube.py new file mode 100755 index 0000000..404425d --- /dev/null +++ b/astro/fitscube.py @@ -0,0 +1,788 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2017-2018 Weitian LI +# MIT license +# + +""" +FITS image cube manipulation tool. + +This tool was originally developed to create a FITS image cube from a +series of CT scan slices to help better visualize/examine them in the +sophisticated SAOImage DS9 software. Each slice in the cube is a CT +image at a position from the CT scan, with the z-axis tracking the slice +positions (equal-distant) in units of, e.g., [cm]. + +Then this tool was significantly improved to deal with the spectral cube +in radio astronomy, with each slice representing the radio sky at a +certain frequency (channel), so the z-axis records the frequency in +units of [Hz]. + +For example, we simulate the observed image using OSKAR and WSClean one +frequency channel at a time, then use this tool to combine them into +a spectral cube, from which the 2D and 1D power spectra is derived. + +The ``calibrate`` sub-command is used to calibrate the frequency channel +responses to make them spectrally smooth by fitting a low-order polynomial. + +The ``corrupt`` sub-command is used to corrupt the frequency channel +responses to simulate that real instrument suffers from calibration +imperfections. +""" + +import os +import sys +import argparse +from datetime import datetime, timezone +from functools import lru_cache + +import numpy as np +from astropy.io import fits +from astropy.wcs import WCS + + +class FITSCube: + """ + FITS image cube. + """ + def __init__(self, infile=None): + if infile is not None: + self.load(infile) + + def load(self, infile): + with fits.open(infile) as f: + self.data = f[0].data + self.header = f[0].header + print("Loaded FITS cube from file: %s" % infile) + print("Cube dimensions: %dx%dx%d" % + (self.width, self.height, self.nslice)) + # The Z-axis position of the first slice. + self.zbegin = self.header["CRVAL3"] + # The Z-axis step/spacing between slices. + self.zstep = self.header["CDELT3"] + + def add_slices(self, infiles, zbegin=0.0, zstep=1.0): + """ + Create a FITS cube from input image slices. + """ + self.infiles = infiles + self.zbegin = zbegin + self.zstep = zstep + nslice = len(infiles) + header, image = self.open_image(infiles[0]) + shape = (nslice, ) + image.shape + data = np.zeros(shape, dtype=image.dtype) + for i, fn in enumerate(infiles): + print("[%d/%d] Adding image slice: %s ..." % (i+1, nslice, fn)) + hdr, img = self.open_image(fn) + data[i, :, :] = img + self.data = data + self.header = header.copy(strip=True) + print("Created FITS cube of dimensions: %dx%dx%d" % + (self.width, self.height, self.nslice)) + + @staticmethod + def open_image(infile): + """ + Open the slice image and return its header and 2D image data. + + NOTE + ---- + The input slice image may have following dimensions: + * NAXIS=2: [Y, X] + * NAXIS=3: [FREQ=1, Y, X] + * NAXIS=4: [STOKES=1, FREQ=1, Y, X] + + NOTE + ---- + Only open slice image that has only ONE frequency and ONE Stokes + parameter. + + Returns + ------- + header : `~astropy.io.fits.Header` + image : 2D `~numpy.ndarray` + The 2D [Y, X] image part of the slice image. + """ + with fits.open(infile) as f: + header = f[0].header + data = f[0].data + if data.ndim == 2: + # NAXIS=2: [Y, X] + image = data + elif data.ndim == 3 and data.shape[0] == 1: + # NAXIS=3: [FREQ=1, Y, X] + image = data[0, :, :] + elif data.ndim == 4 and data.shape[0] == 1 and data.shape[1] == 1: + # NAXIS=4: [STOKES=1, FREQ=1, Y, X] + image = data[0, 0, :, :] + else: + raise ValueError("Slice '{0}' has invalid dimensions: {1}".format( + infile, data.shape)) + return (header, image) + + @property + def header(self): + if not hasattr(self, "header_"): + self.header_ = fits.Header() + return self.header_ + + @header.setter + def header(self, value): + self.header_ = value + for key in ["CTYPE4", "CRPIX4", "CRVAL4", "CDELT4", "CUNIT4"]: + try: + del self.header_[key] + except KeyError: + pass + + @property + @lru_cache() + def wcs(self): + w = WCS(naxis=3) + w.wcs.ctype = ["pixel", "pixel", "pixel"] + w.wcs.crpix = np.array([self.header.get("CRPIX1", 1.0), + self.header.get("CRPIX2", 1.0), + 1.0]) + w.wcs.crval = np.array([self.header.get("CRVAL1", 0.0), + self.header.get("CRVAL2", 0.0), + self.zbegin]) + w.wcs.cdelt = np.array([self.header.get("CDELT1", 1.0), + self.header.get("CDELT2", 1.0), + self.zstep]) + return w + + def keyword(self, key, value=None, comment=None): + header = self.header + if value is None: + return header[key] + else: + header[key] = (value, comment) + + def write(self, outfile, clobber=False): + header = self.header + header.extend(self.wcs.to_header(), update=True) + header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), + "File creation date") + header.add_history(" ".join(sys.argv)) + hdu = fits.PrimaryHDU(data=self.data, header=header) + try: + hdu.writeto(outfile, overwrite=clobber) + except TypeError: + hdu.writeto(outfile, clobber=clobber) + + @property + def width(self): + __, __, w = self.data.shape + return w + + @property + def height(self): + __, h, __ = self.data.shape + return h + + @property + def nslice(self): + ns, __, __ = self.data.shape + return ns + + @property + @lru_cache() + def zvalues(self): + """ + Calculate the Z-axis positions for all slices + """ + nslice = self.nslice + wcs = self.wcs + pix = np.zeros(shape=(nslice, 3), dtype=int) + pix[:, 2] = np.arange(nslice) + world = wcs.wcs_pix2world(pix, 0) + return world[:, 2] + + @property + def slices(self): + """ + A list of slices in the cube w.r.t. ``zvalues``. + """ + return (self.data[i, :, :] for i in range(self.nslice)) + + def get_slice(self, i, csize=None): + """ + Get the i-th (0-based) slice image, and crop out the central box + of size ``csize`` if specified. + """ + if csize is None: + return self.data[i, :, :] + else: + rows, cols = self.height, self.width + rc, cc = rows//2, cols//2 + cs1, cs2 = csize//2, (csize+1)//2 + return self.data[i, (rc-cs1):(rc+cs2), (cc-cs1):(cc+cs2)] + + def apply_gain(self, gain): + """ + Multiply the supplied ``gain`` to each slice, to achieve slice + or channel response calibration or corruption. + """ + gain = np.asarray(gain) + self.data *= gain[:, np.newaxis, np.newaxis] + + def pool(self, blocksize, func=np.mean): + """ + Down-sampling the images by pooling + """ + try: + from skimage.measure import block_reduce + except ImportError: + print("scikit-image not installed") + raise + + self.data = block_reduce(self.data, + block_size=(1, blocksize, blocksize), + func=func) + self.keyword(key="POOL_BS", value=blocksize, + comment="down-sampling block size") + self.keyword(key="POOL_FUN", value=func.__name__, + comment="down-sampling function/method") + + @property + def unit(self): + """ + Cube data unit. + """ + return self.header.get("BUNIT") + + @unit.setter + def unit(self, value): + self.header["BUNIT"] = value + + @property + def zunit(self): + """ + Unit of the slice z-axis positions. + """ + return self.header.get("CUNIT3") + + @zunit.setter + def zunit(self, value): + self.header["CUNIT3"] = value + + +def cmd_info(args): + """ + Sub-command: "info", show FITS cube information + """ + cube = FITSCube(args.infile) + if cube.zunit: + pzunit = " [%s]" % cube.zunit + else: + pzunit = "" + zvalues = cube.zvalues + nslice = cube.nslice + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % nslice) + print("Slice step/spacing: %s%s" % (cube.zstep, pzunit)) + print("Slice positions: %s <-> %s%s" % + (zvalues.min(), zvalues.max(), pzunit)) + if args.stats: + mean = np.zeros(nslice) + std = np.zeros(nslice) + rms = np.zeros(nslice) + for i in range(nslice): + image = cube.get_slice(i, csize=args.center) + if args.abs: + image = np.abs(image) + mean[i] = np.mean(image) + std[i] = np.std(image) + rms[i] = np.sqrt(np.mean(image**2)) + print("Slice ") + for i, z in enumerate(zvalues): + print("* %12.4e: %12.4e %12.4e %12.4e" % + (z, mean[i], std[i], rms[i])) + if args.outfile: + data = np.column_stack([zvalues, mean, std, rms]) + np.savetxt(args.outfile, data, header="z mean std rms") + print("Saved statistics data to file: %s" % args.outfile) + + +def cmd_create(args): + """ + Sub-command: "create", create a FITS cube + """ + if not args.clobber and os.path.exists(args.outfile): + raise FileExistsError("output file already exists: %s" % args.outfile) + cube = FITSCube() + cube.add_slices(args.infiles, zbegin=args.zbegin, zstep=args.zstep) + cube.zunit = args.zunit + if args.unit: + cube.unit = args.unit + cube.write(args.outfile, clobber=args.clobber) + print("Created FITS cube: %s" % args.outfile) + + +def cmd_crop(args): + """ + Sub-command: "crop", crop the central region of a FITS cube + """ + if not args.clobber and os.path.exists(args.outfile): + raise FileExistsError("output file already exists: %s" % args.outfile) + + cube = FITSCube(args.infile) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Cropping region size: %dx%d" % (args.size, args.size)) + s_width = slice((cube.width - args.size) // 2, + (cube.width + args.size) // 2) + s_height = slice((cube.height - args.size) // 2, + (cube.height + args.size) // 2) + cube.data = cube.data[:, s_height, s_width] + print("Saving FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Created FITS cube: %s" % args.outfile) + + +def cmd_add(args): + """ + Sub-command: "add", add two or more FITS cubes + """ + if not args.clobber and os.path.exists(args.outfile): + raise FileExistsError("output file already exists: %s" % args.outfile) + if len(args.infiles) < 2: + raise RuntimeError("Two or more input FITS cubes required") + + cube = FITSCube(args.infiles[0]) + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + + for f in args.infiles[1:]: + cube2 = FITSCube(f) + assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) + print("Adding cube %s ..." % f) + cube.data = cube.data + cube2.data + + print("Saving FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Saved FITS cube: %s" % args.outfile) + + +def cmd_mul(args): + """ + Sub-command: "mul", multiply two or more FITS cubes + """ + if not args.clobber and os.path.exists(args.outfile): + raise FileExistsError("output file already exists: %s" % args.outfile) + if len(args.infiles) < 2: + raise RuntimeError("Two or more input FITS cubes required") + + cube = FITSCube(args.infiles[0]) + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + + for f in args.infiles[1:]: + cube2 = FITSCube(f) + assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) + print("Multiplying cube %s ..." % f) + cube.data = cube.data * cube2.data + + print("Saving FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Saved FITS cube: %s" % args.outfile) + + +def cmd_sub(args): + """ + Sub-command: "sub", subtract one FITS cube by another one + """ + if not args.clobber and os.path.exists(args.outfile): + raise FileExistsError("output file already exists: %s" % args.outfile) + + cube = FITSCube(args.infile) + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + + cube2 = FITSCube(args.infile2) + assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) + print("Subtracting cube %s ..." % args.infile2) + cube.data = cube.data - cube2.data + + print("Saving FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Saved FITS cube: %s" % args.outfile) + + +def cmd_div(args): + """ + Sub-command: "div", divide one FITS cube by another one + """ + if not args.clobber and os.path.exists(args.outfile): + raise FileExistsError("output file already exists: %s" % args.outfile) + + cube = FITSCube(args.infile) + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + + cube2 = FITSCube(args.infile2) + assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) + print("Dividing cube %s ..." % args.infile2) + with np.errstate(divide='warn'): + cube.data = cube.data / cube2.data + + if args.fill_value: + print("Filling invalid data with: %s" % args.fill_value) + cube.data[~np.isfinite(cube.data)] = float(args.fill_value) + + print("Saving FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Saved FITS cube: %s" % args.outfile) + + +def cmd_calibrate(args): + """ + Sub-command: "calibrate", calibrate the z-axis slice/channel responses + by fitting a polynomial. + """ + if not args.dryrun: + if args.outfile is None: + raise ValueError("--outfile required") + elif not args.clobber and os.path.exists(args.outfile): + raise OSError("output file already exists: %s" % args.outfile) + + cube = FITSCube(args.infile) + zvalues = cube.zvalues + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + mean = np.zeros(cube.nslice) + std = np.zeros(cube.nslice) + for i in range(cube.nslice): + image = cube.get_slice(i, csize=args.center) + if args.abs: + image = np.abs(image) + threshold = np.percentile(image, q=100*args.threshold) + data = image[image >= threshold] + mean[i] = np.mean(data) + std[i] = np.std(data) + print("Fitting polynomial order: %d" % args.poly_order) + weights = 1.0 / std + pfit = np.polyfit(zvalues, mean, w=weights, deg=args.poly_order) + mean_new = np.polyval(pfit, zvalues) + coef = mean_new / mean + + if args.dryrun: + print("*** DRY RUN MODE ***") + else: + print("Applying slice/channel calibration gains ...") + cube.apply_gain(coef) + print("Saving calibrated FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Calibrated FITS cube wrote to: %s" % args.outfile) + + print("Slice +/- " + + " ") + for i, z in enumerate(zvalues): + print("* %12.4e: %12.4e %12.4e %12.4e %.6f" % + (z, mean[i], std[i], mean_new[i], coef[i])) + + if args.save_info: + data = np.column_stack([zvalues, mean, std, mean_new, coef]) + header = [ + "Arguments:", + "+ center: %s" % args.center, + "+ abs: %s" % args.abs, + "+ threshold (percentile): %.2f" % args.threshold, + "+ polynomial_order: %d" % args.poly_order, + "", + "Columns:", + "1. z/frequency: z-axis position / frequency [%s]" % cube.zunit, + "2. mean.old: mean before calibration [%s]" % cube.unit, + "3. std.old: standard deviation before calibration", + "4. mean.new: mean after calibration", + "5. gain_coef: calibration coefficient", + "", + ] + infofile = os.path.splitext(args.outfile)[0] + ".txt" + np.savetxt(infofile, data, header="\n".join(header)) + print("Saved calibration information to file: %s" % infofile) + + +def cmd_corrupt(args): + """ + Sub-command: "corrupt", corrupt z-axis slice/channel responses by + applying random gain coefficients. + """ + if not args.clobber and os.path.exists(args.outfile): + raise OSError("output file already exists: %s" % args.outfile) + + cube = FITSCube(args.infile) + zvalues = cube.zvalues + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + + if args.gaus_sigma is not None: + print("Gaussian sigma: %.1f%%" % args.gaus_sigma) + sigma = args.gaus_sigma * 0.01 + gains = np.random.normal(loc=0.0, scale=sigma, size=cube.nslice) + idx_outliers = np.abs(gains) > 3*sigma + gains[idx_outliers] = np.sign(gains[idx_outliers]) * (3*sigma) + gains += 1.0 + else: + print("Use corruption information from file: %s" % args.infofile) + args.save_info = False # ``--info-file`` discards ``--save-info`` + crpdata = np.loadtxt(args.infofile) + gains = crpdata[:, 1] + + print("Applying slice/channel corruptions ...") + cube.apply_gain(gains) + print("Saving corrupted FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Corrupted FITS cube wrote to: %s" % args.outfile) + + print("Slice ") + for z, g in zip(zvalues, gains): + print("* %12.4e: %.6f" % (z, g)) + if args.save_info: + data = np.column_stack([zvalues, gains]) + header = [ + "Arguments:", + "+ gaus_sigma: %.1f%%" % args.gaus_sigma, + "", + "Columns:", + "1. z/frequency: z-axis position / frequency [%s]" % cube.zunit, + "2. gain_corruption: corruption coefficient", + "", + ] + infofile = os.path.splitext(args.outfile)[0] + ".txt" + np.savetxt(infofile, data, header="\n".join(header)) + print("Saved corruption information to file: %s" % infofile) + + +def cmd_pool(args): + """ + Sub-command: "pool", down-sample image cube along the spatial dimension. + """ + if not args.clobber and os.path.exists(args.outfile): + raise OSError("output file already exists: %s" % args.outfile) + + cube = FITSCube(args.infile) + print("Data cube unit: %s" % cube.unit) + print("Image/slice size: %dx%d" % (cube.width, cube.height)) + print("Number of slices: %d" % cube.nslice) + + print("Pooling image cube ...") + print("block size: %d, method: %s" % (args.blocksize, args.method)) + cube.pool(blocksize=args.blocksize, func=getattr(np, args.method)) + print("Pooled image/slice size: %dx%d" % (cube.width, cube.height)) + print("Saving pooled FITS cube ...") + cube.write(args.outfile, clobber=args.clobber) + print("Pooled FITS cube wrote to: %s" % args.outfile) + + +def main(): + parser = argparse.ArgumentParser( + description="FITS image cube manipulation tool") + subparsers = parser.add_subparsers(dest="subparser_name", + title="sub-commands", + help="additional help") + + # sub-command: "info" + parser_info = subparsers.add_parser("info", help="show FITS cube info") + parser_info.add_argument("-a", "--abs", action="store_true", + help="take absolute values for image pixels") + parser_info.add_argument("-c", "--center", type=int, + help="crop the central box region of specified " + + "size to calculate the statistics") + parser_info.add_argument("-s", "--stats", action="store_true", + help="calculate statistics (mean, std, rms) " + "for each slice") + parser_info.add_argument("-o", "--outfile", + help="outfile to save the statistics") + parser_info.add_argument("infile", help="FITS cube filename") + parser_info.set_defaults(func=cmd_info) + + # sub-command: "create" + parser_create = subparsers.add_parser( + "create", aliases=["new"], + help="create a FITS cube from a series of images/slices") + parser_create.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_create.add_argument("-U", "--data-unit", dest="unit", + help="cube data unit (will overwrite the " + + "slice data unit)") + parser_create.add_argument("-z", "--z-begin", dest="zbegin", + type=float, default=0.0, + help="Z-axis position of the first slice") + parser_create.add_argument("-s", "--z-step", dest="zstep", + type=float, default=1.0, + help="Z-axis step/spacing between slices") + parser_create.add_argument("-u", "--z-unit", dest="zunit", + help="Z-axis unit (e.g., cm, Hz)") + parser_create.add_argument("-o", "--outfile", dest="outfile", + required=True, + help="output FITS cube filename") + parser_create.add_argument("-i", "--infiles", dest="infiles", + nargs="+", required=True, + help="input image slices (in order)") + parser_create.set_defaults(func=cmd_create) + + # sub-command: "crop" + parser_crop = subparsers.add_parser( + "crop", + help="crop the central spatial region of the FITS cube") + parser_crop.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_crop.add_argument("-n", "--size", type=int, required=True, + help="crop region size (number of pixels)") + parser_crop.add_argument("-o", "--outfile", required=True, + help="output FITS cube filename") + parser_crop.add_argument("-i", "--infile", required=True, + help="input FITS cube") + parser_crop.set_defaults(func=cmd_crop) + + # sub-command: "add" + parser_add = subparsers.add_parser( + "add", + help="add two or more FITS cubes") + parser_add.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_add.add_argument("-o", "--outfile", required=True, + help="output FITS cube filename") + parser_add.add_argument("-i", "--infiles", nargs="+", required=True, + help="two or more input FITS cubes") + parser_add.set_defaults(func=cmd_add) + + # sub-command: "multiply" + parser_mul = subparsers.add_parser( + "mul", aliases=["multiply"], + help="multiply one FITS cube by another one") + parser_mul.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_mul.add_argument("-o", "--outfile", required=True, + help="output FITS cube filename") + parser_mul.add_argument("-i", "--infiles", nargs="+", required=True, + help="two or more input FITS cubes") + parser_mul.set_defaults(func=cmd_mul) + + # sub-command: "subtract" + parser_sub = subparsers.add_parser( + "sub", aliases=["subtract"], + help="subtract one FITS cube by another one") + parser_sub.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_sub.add_argument("-o", "--outfile", required=True, + help="output FITS cube filename") + parser_sub.add_argument("-i", "--infile", required=True, + help="input FITS cube as the minuend") + parser_sub.add_argument("-I", "--infile2", required=True, + help="another input FITS cube as the subtrahend") + parser_sub.set_defaults(func=cmd_sub) + + # sub-command: "divide" + parser_div = subparsers.add_parser( + "div", aliases=["divide"], + help="divide one FITS cube by another one") + parser_div.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_div.add_argument("-F", "--fill-value", dest="fill_value", + help="value to fill the invalid elements") + parser_div.add_argument("-o", "--outfile", required=True, + help="output FITS cube filename") + parser_div.add_argument("-i", "--infile", required=True, + help="input FITS cube as the dividend") + parser_div.add_argument("-I", "--infile2", required=True, + help="another input FITS cube as the divisor") + parser_div.set_defaults(func=cmd_div) + + # sub-command: "calibrate" + parser_cal = subparsers.add_parser( + "cal", aliases=["calibrate"], + help="calibrate z-axis slice/channel responses by fitting " + + "a polynomial") + parser_cal.add_argument("-n", "--dry-run", dest="dryrun", + action="store_true", + help="dry run mode") + parser_cal.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_cal.add_argument("-c", "--center", dest="center", type=int, + help="crop the central box region of specified " + + "size to calculate the mean/std.") + parser_cal.add_argument("-t", "--threshold", dest="threshold", + type=float, default=0.0, + help="percentile threshold (0 -> 1) and only " + + "considers image pixels with values > threshold " + + "to determine the channel/slice responses; " + + "(default: 0, i.e., all pixels are accounted for)") + parser_cal.add_argument("-a", "--abs", dest="abs", action="store_true", + help="take absolute values for image pixels") + parser_cal.add_argument("-p", "--poly-order", dest="poly_order", + type=int, default=2, + help="order of polynomial used for fitting " + + "(default: 2, i.e., quadratic)") + parser_cal.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS cube filename") + parser_cal.add_argument("-o", "--outfile", dest="outfile", + help="output calibrated FITS cube (optional " + + "for dry-run model)") + parser_cal.add_argument("--save-info", dest="save_info", + action="store_true", + help="save the calibration information of each " + + "channel/slice to a text file") + parser_cal.set_defaults(func=cmd_calibrate) + + # sub-command: "corrupt" + parser_crp = subparsers.add_parser( + "crp", aliases=["corrupt"], + help="corrupt z-axis slice/channel responses by applying " + + "random gain coefficients") + exgrp_crp = parser_crp.add_mutually_exclusive_group(required=True) + exgrp_crp.add_argument("-G", "--gaus-sigma", dest="gaus_sigma", type=float, + help="Gaussian sigma in percent from which " + + "random gain coefficients are sampled; " + + "specified in percent (e.g., 1 for 1%%)") + exgrp_crp.add_argument("-I", "--info-file", dest="infofile", + help="use the gain coefficients from a " + + "(previously saved) corruption information " + + "file; will also discard argument --save-info") + parser_crp.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_crp.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS cube filename") + parser_crp.add_argument("-o", "--outfile", dest="outfile", required=True, + help="output corrupted FITS cube") + parser_crp.add_argument("--save-info", dest="save_info", + action="store_true", + help="save the corruption information of each " + + "channel/slice to a text file") + parser_crp.set_defaults(func=cmd_corrupt) + + # sub-command: "pool" + parser_pool = subparsers.add_parser( + "pool", + help="down-sample image cube along the spatial dimensions") + parser_pool.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_pool.add_argument("-i", "--infile", required=True, + help="input FITS cube filename") + parser_pool.add_argument("-o", "--outfile", required=True, + help="output pooled FITS cube") + parser_pool.add_argument("-n", "--block-size", dest="blocksize", + type=int, required=True, + help="down-sampling block size (i.e., factor)") + parser_pool.add_argument("-m", "--method", default="mean", + choices=["mean", "min", "max"], + help="down-sampling method (default: mean)") + parser_pool.set_defaults(func=cmd_pool) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/astro/fitsimage.py b/astro/fitsimage.py new file mode 100755 index 0000000..af49f7e --- /dev/null +++ b/astro/fitsimage.py @@ -0,0 +1,522 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2017-2018 Weitian LI +# MIT license +# + +""" +FITS image manipulate tool. +""" + +import sys +import argparse + +import numpy as np +from astropy.io import fits +from scipy import ndimage + + +class FITSImage: + """ + FITS image class that deals with plain 2D image (NAXIS=2), but also + handles single-frequency single-polarized image cube (NAXIS=3, 4), + e.g., created by WSClean. + """ + def __init__(self, infile, pixelsize=None): + self.infile = infile + with fits.open(infile) as f: + self.header = f[0].header.copy(strip=True) + self.data = f[0].data + self.ndim = self.data.ndim + self.shape = self.data.shape + if pixelsize is not None: + self.pixelsize = pixelsize # [arcsec] + + @property + def bunit(self): + return self.header.get("BUNIT") + + @property + def Nx(self): + """ + Number of pixels along the X axis (i.e., image width) + """ + return self.shape[-1] + + @property + def Ny(self): + """ + Number of pixels along the Y axis (i.e., image height) + """ + return self.shape[-2] + + @property + def image(self): + """ + Deal with single-frequency and single-polarized image cube. + """ + if self.ndim == 2: + # NAXIS=2: [Y, X] + image = self.data[:, :].copy() + elif self.ndim == 3 and self.shape[0] == 1: + # NAXIS=3: [FREQ=1, Y, X] + image = self.data[0, :, :].copy() + elif self.ndim == 4 and self.shape[0] == 1 and self.shape[1] == 1: + # NAXIS=4: [STOKES=1, FREQ=1, Y, X] + image = self.data[0, 0, :, :].copy() + else: + raise ValueError("invalid data shape: {1}".format(self.shape)) + return image + + @image.setter + def image(self, value): + if self.ndim == 2: + # NAXIS=2: [Y, X] + self.data = np.array(value) + elif self.ndim == 3: + # NAXIS=3: [FREQ=1, Y, X] + self.data = np.array(value)[np.newaxis, :, :] + else: + # NAXIS=4: [STOKES=1, FREQ=1, Y, X] + self.data = np.array(value)[np.newaxis, np.newaxis, :, :] + + @property + def pixelsize(self): + """ + Image pixel size, in units of [arcsec] + """ + if hasattr(self, "_pixelsize"): + return self._pixelsize + + try: + return self.header["PixSize"] # [arcsec] + except KeyError: + try: + return abs(self.header["CDELT1"]) * 3600 # [deg] -> [arcsec] + except KeyError: + return None + + @pixelsize.setter + def pixelsize(self, value): + # Unit: [arcsec] + oldvalue = self.pixelsize + self._pixelsize = value + # Update header + self.header["PixSize"] = value # [arcsec] + try: + self.header["CDELT1"] *= value / oldvalue + self.header["CDELT2"] *= value / oldvalue + except KeyError: + pass + + @property + def fov(self): + """ + Image FoV coverage: (fov_x, fov_y) + Unit: [deg] + """ + pixelsize = self.pixelsize + if pixelsize: + return (self.Nx*pixelsize/3600, self.Ny*pixelsize/3600) + else: + return None + + def zoom(self, newsize, order=1): + """ + Zoom the image to the specified ``newsize``, meanwhile the header + information will be updated accordingly to preserve the FoV coverage. + + NOTE + ---- + The image aspect ratio cannot be changed. + + Parameters + ---------- + newsize : (Nx, Ny) or N + The size of the zoomed image. + order : int, optional + The interpolation order, default: 1 + """ + try: + Nx2, Ny2 = newsize + except TypeError: + Nx2 = Ny2 = newsize + zoom = ((Ny2+0.1)/self.Ny, (Nx2+0.1)/self.Nx) + if abs(zoom[0] - zoom[1]) > 1e-3: + raise RuntimeError("image aspect ratio cannot be changed") + + pixelsize_old = self.pixelsize + self.image = ndimage.zoom(self.image, zoom=zoom, order=order) + self.pixelsize = pixelsize_old * (self.Nx / Nx2) + return self.image + + def flip(self, direction): + if direction == "lr": + self.image = np.fliplr(self.image) + elif direction == "ud": + self.image = np.flipud(self.image) + else: + raise ValueError("invalid flip direction: %s" % direction) + return self.image + + def rotate(self, to): + if to == "left": + self.image = np.rot90(self.image, k=-1) + elif to == "right": + self.image = np.rot90(self.image, k=1) + elif to == "180": + self.image = np.rot90(self.image, k=2) + else: + raise ValueError("invalid rotate to: %s" % to) + return self.image + + def write(self, outfile, clobber=False): + self.header.add_history(" ".join(sys.argv)) + hdu = fits.PrimaryHDU(data=self.data, header=self.header) + try: + hdu.writeto(outfile, overwrite=clobber) + except TypeError: + hdu.writeto(outfile, clobber=clobber) + + +def show_info(filename, abs_=None, center=None): + """ + Show FITS image information. + """ + fimage = FITSImage(filename) + print("Image data shape: {0}".format(fimage.shape)) + print("Image size: %dx%d" % (fimage.Nx, fimage.Ny)) + print("Data unit: [%s]" % fimage.bunit) + pixelsize = fimage.pixelsize + if pixelsize: + print("Pixel size: %.1f [arcsec]" % pixelsize) + print("Field of view: (%.2f, %.2f) [deg]" % fimage.fov) + data = fimage.image + if abs_: + data = np.abs(data) + if center: + print("Central box size: %d" % center) + rows, cols = data.shape + rc, cc = rows//2, cols//2 + cs1, cs2 = center//2, (center+1)//2 + data = data[(rc-cs1):(rc+cs2), (cc-cs1):(cc+cs2)] + min_ = np.nanmin(data) + max_ = np.nanmax(data) + mean = np.nanmean(data) + median = np.nanmedian(data) + std = np.nanstd(data) + iqr = np.diff(np.nanpercentile(data, q=(25, 75))) + mad = np.nanmedian(np.abs(data - median)) + rms = np.sqrt(np.nanmean(data**2)) + print("min: %13.6e" % min_) + print("max: %13.6e" % max_) + print("range: %13.6e (max - min)" % (max_ - min_)) + print("mean: %13.6e" % mean) + print("median: %13.6e" % median) + print("std: %13.6e (standard deviation)" % std) + print("iqr: %13.6e (interquartile range)" % iqr) + print("mad: %13.6e (median absolute deviation)" % mad) + print("rms: %13.6e (root-mean-squared)" % rms) + + +def cmd_info(args): + """ + Sub-command: "info", show FITS image information + """ + for fn in args.files: + print(">>> %s <<<" % fn) + show_info(fn, abs_=args.abs, center=args.center) + print("") + + +def cmd_add(args): + """ + Sub-command: "add", add the image by a number or other image(s) + """ + fimage = FITSImage(args.infile) + image = fimage.image + if args.number: + print("Add by number: %g" % args.number) + image += args.number + else: + for fn in args.files: + print("Add by another image from: %s" % fn) + fimage2 = FITSImage(fn) + image += fimage2.image + fimage.image = image + fimage.write(args.outfile, clobber=args.clobber) + print("Saved FITS image to: %s" % args.outfile) + + +def cmd_sub(args): + """ + Sub-command: "sub", subtract the image by a number or other image(s) + """ + fimage = FITSImage(args.infile) + image = fimage.image + if args.number: + print("Subtract by number: %g" % args.number) + image -= args.number + else: + for fn in args.files: + print("Subtract by another image from: %s" % fn) + fimage2 = FITSImage(fn) + image -= fimage2.image + fimage.image = image + fimage.write(args.outfile, clobber=args.clobber) + print("Saved FITS image to: %s" % args.outfile) + + +def cmd_mul(args): + """ + Sub-command: "mul", multiply the image by a number or other image(s) + """ + fimage = FITSImage(args.infile) + image = fimage.image + if args.number: + print("Multiply by number: %g" % args.number) + image *= args.number + else: + for fn in args.files: + print("Multiply by another image from: %s" % fn) + fimage2 = FITSImage(fn) + image *= fimage2.image + fimage.image = image + fimage.write(args.outfile, clobber=args.clobber) + print("Saved FITS image to: %s" % args.outfile) + + +def cmd_div(args): + """ + Sub-command: "div", divide the image by a number or other image(s) + """ + fimage = FITSImage(args.infile) + image = fimage.image + if args.number: + print("Divide by number: %g" % args.number) + image /= args.number + else: + for fn in args.files: + print("Divide by another image from: %s" % fn) + fimage2 = FITSImage(fn) + with np.errstate(divide="warn"): + image /= fimage2.image + + if args.fill_value: + print("Filling invalid data with: %s" % args.fill_value) + image[~np.isfinite(image)] = float(args.fill_value) + fimage.image = image + fimage.write(args.outfile, clobber=args.clobber) + print("Saved FITS image to: %s" % args.outfile) + + +def cmd_zoom(args): + """ + Sub-command: "zoom", zoom the image to a new size with FoV coverage + preserved. + """ + fimage = FITSImage(args.infile) + print("Image size: %dx%d" % (fimage.Nx, fimage.Ny)) + pixelsize = fimage.pixelsize + if pixelsize is None: + raise RuntimeError("--pixelsize required") + else: + print("Pixel size: %.1f [arcsec]" % pixelsize) + print("Field of view: (%.2f, %.2f) [deg]" % fimage.fov) + + print("Zooming image ...") + print("Interpolation order: %d" % args.order) + print("Zoomed image size: %dx%d" % (args.size, args.size)) + fimage.zoom(newsize=args.size, order=args.order) + print("Zoomed image pixel size: %.1f [arcsec]" % fimage.pixelsize) + fimage.write(args.outfile, clobber=args.clobber) + print("Saved zoomed FITS image to: %s" % args.outfile) + + +def cmd_flip(args): + """ + Sub-command: "flip", flip the image left-right or up-down. + """ + fimage = FITSImage(args.infile) + print("Flipping image ...") + direction = "lr" if args.lr else "ud" + print("Flip direction: %s" % direction) + fimage.flip(direction) + fimage.write(args.outfile, clobber=args.clobber) + print("Saved flipped FITS image to: %s" % args.outfile) + + +def cmd_rotate(args): + """ + Sub-command: "rotate", rotate the image. + """ + fimage = FITSImage(args.infile) + print("Rotating image ...") + if args.left: + to = "left" + elif args.right: + to = "right" + else: + to = "180" + print("Rotate to: %s" % to) + fimage.rotate(to) + fimage.write(args.outfile, clobber=args.clobber) + print("Saved rotated FITS image to: %s" % args.outfile) + + +def main(): + parser = argparse.ArgumentParser( + description="FITS image manipulation tool") + subparsers = parser.add_subparsers(dest="subparser_name", + title="sub-commands", + help="additional help") + + # sub-command: "info" + parser_info = subparsers.add_parser( + "info", aliases=["show"], + help="show FITS image info") + parser_info.add_argument("-c", "--center", dest="center", type=int, + help="choose central region of specified size") + parser_info.add_argument("-a", "--abs", dest="abs", action="store_true", + help="take absolute values of image pixels") + parser_info.add_argument("files", nargs="+", help="FITS image filename") + parser_info.set_defaults(func=cmd_info) + + # sub-command: "add" + parser_add = subparsers.add_parser( + "add", + help="add the image by a number or other image(s)") + parser_add.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_add.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS image") + parser_add.add_argument("-o", "--outfile", dest="outfile", required=True, + help="output FITS image") + exgrp_add = parser_add.add_mutually_exclusive_group(required=True) + exgrp_add.add_argument("-n", "--number", dest="number", type=float, + help="number to be added by") + exgrp_add.add_argument("-f", "--files", dest="files", nargs="+", + help="FITS image(s) to be added by") + parser_add.set_defaults(func=cmd_add) + + # sub-command: "sub" + parser_sub = subparsers.add_parser( + "sub", aliases=["subtract"], + help="subtract the image by a number or other image(s)") + parser_sub.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_sub.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS image") + parser_sub.add_argument("-o", "--outfile", dest="outfile", required=True, + help="output FITS image") + exgrp_sub = parser_sub.add_mutually_exclusive_group(required=True) + exgrp_sub.add_argument("-n", "--number", dest="number", type=float, + help="number to be subtracted by") + exgrp_sub.add_argument("-f", "--files", dest="files", nargs="+", + help="FITS image(s) to be subtracted by") + parser_sub.set_defaults(func=cmd_sub) + + # sub-command: "mul" + parser_mul = subparsers.add_parser( + "mul", aliases=["multiply"], + help="multiply the image by a number or other image(s)") + parser_mul.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_mul.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS image") + parser_mul.add_argument("-o", "--outfile", dest="outfile", required=True, + help="output FITS image") + exgrp_mul = parser_mul.add_mutually_exclusive_group(required=True) + exgrp_mul.add_argument("-n", "--number", dest="number", type=float, + help="number to be multiplied by") + exgrp_mul.add_argument("-f", "--files", dest="files", nargs="+", + help="FITS image(s) to be multiplied by") + parser_mul.set_defaults(func=cmd_mul) + + # sub-command: "div" + parser_div = subparsers.add_parser( + "div", aliases=["divide"], + help="divide the image by a number or other image(s)") + parser_div.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_div.add_argument("-F", "--fill-value", dest="fill_value", + help="value to fill the invalid elements") + parser_div.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS image") + parser_div.add_argument("-o", "--outfile", dest="outfile", required=True, + help="output FITS image") + exgrp_div = parser_div.add_mutually_exclusive_group(required=True) + exgrp_div.add_argument("-n", "--number", dest="number", type=float, + help="number to be divided by") + exgrp_div.add_argument("-f", "--files", dest="files", nargs="+", + help="FITS image(s) to be divided by") + parser_div.set_defaults(func=cmd_div) + + # sub-command: "zoom" + parser_zoom = subparsers.add_parser( + "zoom", aliases=["rescale"], + help="zoom the image to a new size with FoV coverage preserved") + parser_zoom.add_argument("-C", "--clobber", dest="clobber", + action="store_true", + help="overwrite existing output file") + parser_zoom.add_argument("--order", type=int, default=1, + help="zoom interpolation order (default: 1)") + parser_zoom.add_argument("-s", "--size", type=int, required=True, + help="zoomed image size (number of pixels)") + parser_zoom.add_argument("-p", "--pixelsize", type=float, + help="input FITS image pixel size [arcsec] " + + "(default: try to obtain from FITS header)") + parser_zoom.add_argument("-i", "--infile", dest="infile", required=True, + help="input FITS image") + parser_zoom.add_argument("-o", "--outfile", dest="outfile", required=True, + help="output zoomed FITS image") + parser_zoom.set_defaults(func=cmd_zoom) + + # sub-command: "flip" + parser_flip = subparsers.add_parser( + "flip", help="flip the image left-right or up-down") + parser_flip.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_flip.add_argument("-i", "--infile", required=True, + help="input FITS image") + parser_flip.add_argument("-o", "--outfile", required=True, + help="output flipped FITS image") + exgrp_flip = parser_flip.add_mutually_exclusive_group(required=True) + exgrp_flip.add_argument("-l", "--left-right", dest="lr", + action="store_true", + help="flip in the left/right direction") + exgrp_flip.add_argument("-u", "--up-down", dest="ud", + action="store_true", + help="flip in the left/right direction") + parser_flip.set_defaults(func=cmd_flip) + + # sub-command: "rotate" + parser_rot = subparsers.add_parser( + "rot", aliases=["rotate"], + help="rotate the image") + parser_rot.add_argument("-C", "--clobber", action="store_true", + help="overwrite existing output file") + parser_rot.add_argument("-i", "--infile", required=True, + help="input FITS image") + parser_rot.add_argument("-o", "--outfile", required=True, + help="output rotated FITS image") + exgrp_rot = parser_rot.add_mutually_exclusive_group(required=True) + exgrp_rot.add_argument("-l", "--left", action="store_true", + help="rotate left") + exgrp_rot.add_argument("-r", "--right", action="store_true", + help="rotate right") + exgrp_rot.add_argument("-u", "--180", dest="ud", + action="store_true", + help="rotate 180 degree") + parser_rot.set_defaults(func=cmd_rotate) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() -- cgit v1.2.2