diff options
Diffstat (limited to 'astro/fits')
-rwxr-xr-x | astro/fits/fitscube.py | 788 | ||||
-rwxr-xr-x | astro/fits/fitsimage.py | 522 |
2 files changed, 0 insertions, 1310 deletions
diff --git a/astro/fits/fitscube.py b/astro/fits/fitscube.py deleted file mode 100755 index 404425d..0000000 --- a/astro/fits/fitscube.py +++ /dev/null @@ -1,788 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2017-2018 Weitian LI <wt@liwt.net> -# MIT license -# - -""" -FITS image cube manipulation tool. - -This tool was originally developed to create a FITS image cube from a -series of CT scan slices to help better visualize/examine them in the -sophisticated SAOImage DS9 software. Each slice in the cube is a CT -image at a position from the CT scan, with the z-axis tracking the slice -positions (equal-distant) in units of, e.g., [cm]. - -Then this tool was significantly improved to deal with the spectral cube -in radio astronomy, with each slice representing the radio sky at a -certain frequency (channel), so the z-axis records the frequency in -units of [Hz]. - -For example, we simulate the observed image using OSKAR and WSClean one -frequency channel at a time, then use this tool to combine them into -a spectral cube, from which the 2D and 1D power spectra is derived. - -The ``calibrate`` sub-command is used to calibrate the frequency channel -responses to make them spectrally smooth by fitting a low-order polynomial. - -The ``corrupt`` sub-command is used to corrupt the frequency channel -responses to simulate that real instrument suffers from calibration -imperfections. -""" - -import os -import sys -import argparse -from datetime import datetime, timezone -from functools import lru_cache - -import numpy as np -from astropy.io import fits -from astropy.wcs import WCS - - -class FITSCube: - """ - FITS image cube. - """ - def __init__(self, infile=None): - if infile is not None: - self.load(infile) - - def load(self, infile): - with fits.open(infile) as f: - self.data = f[0].data - self.header = f[0].header - print("Loaded FITS cube from file: %s" % infile) - print("Cube dimensions: %dx%dx%d" % - (self.width, self.height, self.nslice)) - # The Z-axis position of the first slice. - self.zbegin = self.header["CRVAL3"] - # The Z-axis step/spacing between slices. - self.zstep = self.header["CDELT3"] - - def add_slices(self, infiles, zbegin=0.0, zstep=1.0): - """ - Create a FITS cube from input image slices. - """ - self.infiles = infiles - self.zbegin = zbegin - self.zstep = zstep - nslice = len(infiles) - header, image = self.open_image(infiles[0]) - shape = (nslice, ) + image.shape - data = np.zeros(shape, dtype=image.dtype) - for i, fn in enumerate(infiles): - print("[%d/%d] Adding image slice: %s ..." % (i+1, nslice, fn)) - hdr, img = self.open_image(fn) - data[i, :, :] = img - self.data = data - self.header = header.copy(strip=True) - print("Created FITS cube of dimensions: %dx%dx%d" % - (self.width, self.height, self.nslice)) - - @staticmethod - def open_image(infile): - """ - Open the slice image and return its header and 2D image data. - - NOTE - ---- - The input slice image may have following dimensions: - * NAXIS=2: [Y, X] - * NAXIS=3: [FREQ=1, Y, X] - * NAXIS=4: [STOKES=1, FREQ=1, Y, X] - - NOTE - ---- - Only open slice image that has only ONE frequency and ONE Stokes - parameter. - - Returns - ------- - header : `~astropy.io.fits.Header` - image : 2D `~numpy.ndarray` - The 2D [Y, X] image part of the slice image. - """ - with fits.open(infile) as f: - header = f[0].header - data = f[0].data - if data.ndim == 2: - # NAXIS=2: [Y, X] - image = data - elif data.ndim == 3 and data.shape[0] == 1: - # NAXIS=3: [FREQ=1, Y, X] - image = data[0, :, :] - elif data.ndim == 4 and data.shape[0] == 1 and data.shape[1] == 1: - # NAXIS=4: [STOKES=1, FREQ=1, Y, X] - image = data[0, 0, :, :] - else: - raise ValueError("Slice '{0}' has invalid dimensions: {1}".format( - infile, data.shape)) - return (header, image) - - @property - def header(self): - if not hasattr(self, "header_"): - self.header_ = fits.Header() - return self.header_ - - @header.setter - def header(self, value): - self.header_ = value - for key in ["CTYPE4", "CRPIX4", "CRVAL4", "CDELT4", "CUNIT4"]: - try: - del self.header_[key] - except KeyError: - pass - - @property - @lru_cache() - def wcs(self): - w = WCS(naxis=3) - w.wcs.ctype = ["pixel", "pixel", "pixel"] - w.wcs.crpix = np.array([self.header.get("CRPIX1", 1.0), - self.header.get("CRPIX2", 1.0), - 1.0]) - w.wcs.crval = np.array([self.header.get("CRVAL1", 0.0), - self.header.get("CRVAL2", 0.0), - self.zbegin]) - w.wcs.cdelt = np.array([self.header.get("CDELT1", 1.0), - self.header.get("CDELT2", 1.0), - self.zstep]) - return w - - def keyword(self, key, value=None, comment=None): - header = self.header - if value is None: - return header[key] - else: - header[key] = (value, comment) - - def write(self, outfile, clobber=False): - header = self.header - header.extend(self.wcs.to_header(), update=True) - header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(), - "File creation date") - header.add_history(" ".join(sys.argv)) - hdu = fits.PrimaryHDU(data=self.data, header=header) - try: - hdu.writeto(outfile, overwrite=clobber) - except TypeError: - hdu.writeto(outfile, clobber=clobber) - - @property - def width(self): - __, __, w = self.data.shape - return w - - @property - def height(self): - __, h, __ = self.data.shape - return h - - @property - def nslice(self): - ns, __, __ = self.data.shape - return ns - - @property - @lru_cache() - def zvalues(self): - """ - Calculate the Z-axis positions for all slices - """ - nslice = self.nslice - wcs = self.wcs - pix = np.zeros(shape=(nslice, 3), dtype=int) - pix[:, 2] = np.arange(nslice) - world = wcs.wcs_pix2world(pix, 0) - return world[:, 2] - - @property - def slices(self): - """ - A list of slices in the cube w.r.t. ``zvalues``. - """ - return (self.data[i, :, :] for i in range(self.nslice)) - - def get_slice(self, i, csize=None): - """ - Get the i-th (0-based) slice image, and crop out the central box - of size ``csize`` if specified. - """ - if csize is None: - return self.data[i, :, :] - else: - rows, cols = self.height, self.width - rc, cc = rows//2, cols//2 - cs1, cs2 = csize//2, (csize+1)//2 - return self.data[i, (rc-cs1):(rc+cs2), (cc-cs1):(cc+cs2)] - - def apply_gain(self, gain): - """ - Multiply the supplied ``gain`` to each slice, to achieve slice - or channel response calibration or corruption. - """ - gain = np.asarray(gain) - self.data *= gain[:, np.newaxis, np.newaxis] - - def pool(self, blocksize, func=np.mean): - """ - Down-sampling the images by pooling - """ - try: - from skimage.measure import block_reduce - except ImportError: - print("scikit-image not installed") - raise - - self.data = block_reduce(self.data, - block_size=(1, blocksize, blocksize), - func=func) - self.keyword(key="POOL_BS", value=blocksize, - comment="down-sampling block size") - self.keyword(key="POOL_FUN", value=func.__name__, - comment="down-sampling function/method") - - @property - def unit(self): - """ - Cube data unit. - """ - return self.header.get("BUNIT") - - @unit.setter - def unit(self, value): - self.header["BUNIT"] = value - - @property - def zunit(self): - """ - Unit of the slice z-axis positions. - """ - return self.header.get("CUNIT3") - - @zunit.setter - def zunit(self, value): - self.header["CUNIT3"] = value - - -def cmd_info(args): - """ - Sub-command: "info", show FITS cube information - """ - cube = FITSCube(args.infile) - if cube.zunit: - pzunit = " [%s]" % cube.zunit - else: - pzunit = "" - zvalues = cube.zvalues - nslice = cube.nslice - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % nslice) - print("Slice step/spacing: %s%s" % (cube.zstep, pzunit)) - print("Slice positions: %s <-> %s%s" % - (zvalues.min(), zvalues.max(), pzunit)) - if args.stats: - mean = np.zeros(nslice) - std = np.zeros(nslice) - rms = np.zeros(nslice) - for i in range(nslice): - image = cube.get_slice(i, csize=args.center) - if args.abs: - image = np.abs(image) - mean[i] = np.mean(image) - std[i] = np.std(image) - rms[i] = np.sqrt(np.mean(image**2)) - print("Slice <z> <mean> <std> <rms>") - for i, z in enumerate(zvalues): - print("* %12.4e: %12.4e %12.4e %12.4e" % - (z, mean[i], std[i], rms[i])) - if args.outfile: - data = np.column_stack([zvalues, mean, std, rms]) - np.savetxt(args.outfile, data, header="z mean std rms") - print("Saved statistics data to file: %s" % args.outfile) - - -def cmd_create(args): - """ - Sub-command: "create", create a FITS cube - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - cube = FITSCube() - cube.add_slices(args.infiles, zbegin=args.zbegin, zstep=args.zstep) - cube.zunit = args.zunit - if args.unit: - cube.unit = args.unit - cube.write(args.outfile, clobber=args.clobber) - print("Created FITS cube: %s" % args.outfile) - - -def cmd_crop(args): - """ - Sub-command: "crop", crop the central region of a FITS cube - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Cropping region size: %dx%d" % (args.size, args.size)) - s_width = slice((cube.width - args.size) // 2, - (cube.width + args.size) // 2) - s_height = slice((cube.height - args.size) // 2, - (cube.height + args.size) // 2) - cube.data = cube.data[:, s_height, s_width] - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Created FITS cube: %s" % args.outfile) - - -def cmd_add(args): - """ - Sub-command: "add", add two or more FITS cubes - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - if len(args.infiles) < 2: - raise RuntimeError("Two or more input FITS cubes required") - - cube = FITSCube(args.infiles[0]) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - for f in args.infiles[1:]: - cube2 = FITSCube(f) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Adding cube %s ..." % f) - cube.data = cube.data + cube2.data - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_mul(args): - """ - Sub-command: "mul", multiply two or more FITS cubes - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - if len(args.infiles) < 2: - raise RuntimeError("Two or more input FITS cubes required") - - cube = FITSCube(args.infiles[0]) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - for f in args.infiles[1:]: - cube2 = FITSCube(f) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Multiplying cube %s ..." % f) - cube.data = cube.data * cube2.data - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_sub(args): - """ - Sub-command: "sub", subtract one FITS cube by another one - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - cube2 = FITSCube(args.infile2) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Subtracting cube %s ..." % args.infile2) - cube.data = cube.data - cube2.data - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_div(args): - """ - Sub-command: "div", divide one FITS cube by another one - """ - if not args.clobber and os.path.exists(args.outfile): - raise FileExistsError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - cube2 = FITSCube(args.infile2) - assert (cube.unit, cube.zunit) == (cube2.unit, cube2.zunit) - print("Dividing cube %s ..." % args.infile2) - with np.errstate(divide='warn'): - cube.data = cube.data / cube2.data - - if args.fill_value: - print("Filling invalid data with: %s" % args.fill_value) - cube.data[~np.isfinite(cube.data)] = float(args.fill_value) - - print("Saving FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Saved FITS cube: %s" % args.outfile) - - -def cmd_calibrate(args): - """ - Sub-command: "calibrate", calibrate the z-axis slice/channel responses - by fitting a polynomial. - """ - if not args.dryrun: - if args.outfile is None: - raise ValueError("--outfile required") - elif not args.clobber and os.path.exists(args.outfile): - raise OSError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - zvalues = cube.zvalues - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - mean = np.zeros(cube.nslice) - std = np.zeros(cube.nslice) - for i in range(cube.nslice): - image = cube.get_slice(i, csize=args.center) - if args.abs: - image = np.abs(image) - threshold = np.percentile(image, q=100*args.threshold) - data = image[image >= threshold] - mean[i] = np.mean(data) - std[i] = np.std(data) - print("Fitting polynomial order: %d" % args.poly_order) - weights = 1.0 / std - pfit = np.polyfit(zvalues, mean, w=weights, deg=args.poly_order) - mean_new = np.polyval(pfit, zvalues) - coef = mean_new / mean - - if args.dryrun: - print("*** DRY RUN MODE ***") - else: - print("Applying slice/channel calibration gains ...") - cube.apply_gain(coef) - print("Saving calibrated FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Calibrated FITS cube wrote to: %s" % args.outfile) - - print("Slice <z> <mean.old> +/- <std.old> " + - "<mean.new> <gain.coef>") - for i, z in enumerate(zvalues): - print("* %12.4e: %12.4e %12.4e %12.4e %.6f" % - (z, mean[i], std[i], mean_new[i], coef[i])) - - if args.save_info: - data = np.column_stack([zvalues, mean, std, mean_new, coef]) - header = [ - "Arguments:", - "+ center: %s" % args.center, - "+ abs: %s" % args.abs, - "+ threshold (percentile): %.2f" % args.threshold, - "+ polynomial_order: %d" % args.poly_order, - "", - "Columns:", - "1. z/frequency: z-axis position / frequency [%s]" % cube.zunit, - "2. mean.old: mean before calibration [%s]" % cube.unit, - "3. std.old: standard deviation before calibration", - "4. mean.new: mean after calibration", - "5. gain_coef: calibration coefficient", - "", - ] - infofile = os.path.splitext(args.outfile)[0] + ".txt" - np.savetxt(infofile, data, header="\n".join(header)) - print("Saved calibration information to file: %s" % infofile) - - -def cmd_corrupt(args): - """ - Sub-command: "corrupt", corrupt z-axis slice/channel responses by - applying random gain coefficients. - """ - if not args.clobber and os.path.exists(args.outfile): - raise OSError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - zvalues = cube.zvalues - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - if args.gaus_sigma is not None: - print("Gaussian sigma: %.1f%%" % args.gaus_sigma) - sigma = args.gaus_sigma * 0.01 - gains = np.random.normal(loc=0.0, scale=sigma, size=cube.nslice) - idx_outliers = np.abs(gains) > 3*sigma - gains[idx_outliers] = np.sign(gains[idx_outliers]) * (3*sigma) - gains += 1.0 - else: - print("Use corruption information from file: %s" % args.infofile) - args.save_info = False # ``--info-file`` discards ``--save-info`` - crpdata = np.loadtxt(args.infofile) - gains = crpdata[:, 1] - - print("Applying slice/channel corruptions ...") - cube.apply_gain(gains) - print("Saving corrupted FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Corrupted FITS cube wrote to: %s" % args.outfile) - - print("Slice <z> <gain.corruption>") - for z, g in zip(zvalues, gains): - print("* %12.4e: %.6f" % (z, g)) - if args.save_info: - data = np.column_stack([zvalues, gains]) - header = [ - "Arguments:", - "+ gaus_sigma: %.1f%%" % args.gaus_sigma, - "", - "Columns:", - "1. z/frequency: z-axis position / frequency [%s]" % cube.zunit, - "2. gain_corruption: corruption coefficient", - "", - ] - infofile = os.path.splitext(args.outfile)[0] + ".txt" - np.savetxt(infofile, data, header="\n".join(header)) - print("Saved corruption information to file: %s" % infofile) - - -def cmd_pool(args): - """ - Sub-command: "pool", down-sample image cube along the spatial dimension. - """ - if not args.clobber and os.path.exists(args.outfile): - raise OSError("output file already exists: %s" % args.outfile) - - cube = FITSCube(args.infile) - print("Data cube unit: %s" % cube.unit) - print("Image/slice size: %dx%d" % (cube.width, cube.height)) - print("Number of slices: %d" % cube.nslice) - - print("Pooling image cube ...") - print("block size: %d, method: %s" % (args.blocksize, args.method)) - cube.pool(blocksize=args.blocksize, func=getattr(np, args.method)) - print("Pooled image/slice size: %dx%d" % (cube.width, cube.height)) - print("Saving pooled FITS cube ...") - cube.write(args.outfile, clobber=args.clobber) - print("Pooled FITS cube wrote to: %s" % args.outfile) - - -def main(): - parser = argparse.ArgumentParser( - description="FITS image cube manipulation tool") - subparsers = parser.add_subparsers(dest="subparser_name", - title="sub-commands", - help="additional help") - - # sub-command: "info" - parser_info = subparsers.add_parser("info", help="show FITS cube info") - parser_info.add_argument("-a", "--abs", action="store_true", - help="take absolute values for image pixels") - parser_info.add_argument("-c", "--center", type=int, - help="crop the central box region of specified " + - "size to calculate the statistics") - parser_info.add_argument("-s", "--stats", action="store_true", - help="calculate statistics (mean, std, rms) " - "for each slice") - parser_info.add_argument("-o", "--outfile", - help="outfile to save the statistics") - parser_info.add_argument("infile", help="FITS cube filename") - parser_info.set_defaults(func=cmd_info) - - # sub-command: "create" - parser_create = subparsers.add_parser( - "create", aliases=["new"], - help="create a FITS cube from a series of images/slices") - parser_create.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_create.add_argument("-U", "--data-unit", dest="unit", - help="cube data unit (will overwrite the " + - "slice data unit)") - parser_create.add_argument("-z", "--z-begin", dest="zbegin", - type=float, default=0.0, - help="Z-axis position of the first slice") - parser_create.add_argument("-s", "--z-step", dest="zstep", - type=float, default=1.0, - help="Z-axis step/spacing between slices") - parser_create.add_argument("-u", "--z-unit", dest="zunit", - help="Z-axis unit (e.g., cm, Hz)") - parser_create.add_argument("-o", "--outfile", dest="outfile", - required=True, - help="output FITS cube filename") - parser_create.add_argument("-i", "--infiles", dest="infiles", - nargs="+", required=True, - help="input image slices (in order)") - parser_create.set_defaults(func=cmd_create) - - # sub-command: "crop" - parser_crop = subparsers.add_parser( - "crop", - help="crop the central spatial region of the FITS cube") - parser_crop.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_crop.add_argument("-n", "--size", type=int, required=True, - help="crop region size (number of pixels)") - parser_crop.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_crop.add_argument("-i", "--infile", required=True, - help="input FITS cube") - parser_crop.set_defaults(func=cmd_crop) - - # sub-command: "add" - parser_add = subparsers.add_parser( - "add", - help="add two or more FITS cubes") - parser_add.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_add.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_add.add_argument("-i", "--infiles", nargs="+", required=True, - help="two or more input FITS cubes") - parser_add.set_defaults(func=cmd_add) - - # sub-command: "multiply" - parser_mul = subparsers.add_parser( - "mul", aliases=["multiply"], - help="multiply one FITS cube by another one") - parser_mul.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_mul.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_mul.add_argument("-i", "--infiles", nargs="+", required=True, - help="two or more input FITS cubes") - parser_mul.set_defaults(func=cmd_mul) - - # sub-command: "subtract" - parser_sub = subparsers.add_parser( - "sub", aliases=["subtract"], - help="subtract one FITS cube by another one") - parser_sub.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_sub.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_sub.add_argument("-i", "--infile", required=True, - help="input FITS cube as the minuend") - parser_sub.add_argument("-I", "--infile2", required=True, - help="another input FITS cube as the subtrahend") - parser_sub.set_defaults(func=cmd_sub) - - # sub-command: "divide" - parser_div = subparsers.add_parser( - "div", aliases=["divide"], - help="divide one FITS cube by another one") - parser_div.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_div.add_argument("-F", "--fill-value", dest="fill_value", - help="value to fill the invalid elements") - parser_div.add_argument("-o", "--outfile", required=True, - help="output FITS cube filename") - parser_div.add_argument("-i", "--infile", required=True, - help="input FITS cube as the dividend") - parser_div.add_argument("-I", "--infile2", required=True, - help="another input FITS cube as the divisor") - parser_div.set_defaults(func=cmd_div) - - # sub-command: "calibrate" - parser_cal = subparsers.add_parser( - "cal", aliases=["calibrate"], - help="calibrate z-axis slice/channel responses by fitting " + - "a polynomial") - parser_cal.add_argument("-n", "--dry-run", dest="dryrun", - action="store_true", - help="dry run mode") - parser_cal.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_cal.add_argument("-c", "--center", dest="center", type=int, - help="crop the central box region of specified " + - "size to calculate the mean/std.") - parser_cal.add_argument("-t", "--threshold", dest="threshold", - type=float, default=0.0, - help="percentile threshold (0 -> 1) and only " + - "considers image pixels with values > threshold " + - "to determine the channel/slice responses; " + - "(default: 0, i.e., all pixels are accounted for)") - parser_cal.add_argument("-a", "--abs", dest="abs", action="store_true", - help="take absolute values for image pixels") - parser_cal.add_argument("-p", "--poly-order", dest="poly_order", - type=int, default=2, - help="order of polynomial used for fitting " + - "(default: 2, i.e., quadratic)") - parser_cal.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS cube filename") - parser_cal.add_argument("-o", "--outfile", dest="outfile", - help="output calibrated FITS cube (optional " + - "for dry-run model)") - parser_cal.add_argument("--save-info", dest="save_info", - action="store_true", - help="save the calibration information of each " + - "channel/slice to a text file") - parser_cal.set_defaults(func=cmd_calibrate) - - # sub-command: "corrupt" - parser_crp = subparsers.add_parser( - "crp", aliases=["corrupt"], - help="corrupt z-axis slice/channel responses by applying " + - "random gain coefficients") - exgrp_crp = parser_crp.add_mutually_exclusive_group(required=True) - exgrp_crp.add_argument("-G", "--gaus-sigma", dest="gaus_sigma", type=float, - help="Gaussian sigma in percent from which " + - "random gain coefficients are sampled; " + - "specified in percent (e.g., 1 for 1%%)") - exgrp_crp.add_argument("-I", "--info-file", dest="infofile", - help="use the gain coefficients from a " + - "(previously saved) corruption information " + - "file; will also discard argument --save-info") - parser_crp.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_crp.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS cube filename") - parser_crp.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output corrupted FITS cube") - parser_crp.add_argument("--save-info", dest="save_info", - action="store_true", - help="save the corruption information of each " + - "channel/slice to a text file") - parser_crp.set_defaults(func=cmd_corrupt) - - # sub-command: "pool" - parser_pool = subparsers.add_parser( - "pool", - help="down-sample image cube along the spatial dimensions") - parser_pool.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_pool.add_argument("-i", "--infile", required=True, - help="input FITS cube filename") - parser_pool.add_argument("-o", "--outfile", required=True, - help="output pooled FITS cube") - parser_pool.add_argument("-n", "--block-size", dest="blocksize", - type=int, required=True, - help="down-sampling block size (i.e., factor)") - parser_pool.add_argument("-m", "--method", default="mean", - choices=["mean", "min", "max"], - help="down-sampling method (default: mean)") - parser_pool.set_defaults(func=cmd_pool) - - args = parser.parse_args() - args.func(args) - - -if __name__ == "__main__": - main() diff --git a/astro/fits/fitsimage.py b/astro/fits/fitsimage.py deleted file mode 100755 index af49f7e..0000000 --- a/astro/fits/fitsimage.py +++ /dev/null @@ -1,522 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2017-2018 Weitian LI <weitian@aaronly.me> -# MIT license -# - -""" -FITS image manipulate tool. -""" - -import sys -import argparse - -import numpy as np -from astropy.io import fits -from scipy import ndimage - - -class FITSImage: - """ - FITS image class that deals with plain 2D image (NAXIS=2), but also - handles single-frequency single-polarized image cube (NAXIS=3, 4), - e.g., created by WSClean. - """ - def __init__(self, infile, pixelsize=None): - self.infile = infile - with fits.open(infile) as f: - self.header = f[0].header.copy(strip=True) - self.data = f[0].data - self.ndim = self.data.ndim - self.shape = self.data.shape - if pixelsize is not None: - self.pixelsize = pixelsize # [arcsec] - - @property - def bunit(self): - return self.header.get("BUNIT") - - @property - def Nx(self): - """ - Number of pixels along the X axis (i.e., image width) - """ - return self.shape[-1] - - @property - def Ny(self): - """ - Number of pixels along the Y axis (i.e., image height) - """ - return self.shape[-2] - - @property - def image(self): - """ - Deal with single-frequency and single-polarized image cube. - """ - if self.ndim == 2: - # NAXIS=2: [Y, X] - image = self.data[:, :].copy() - elif self.ndim == 3 and self.shape[0] == 1: - # NAXIS=3: [FREQ=1, Y, X] - image = self.data[0, :, :].copy() - elif self.ndim == 4 and self.shape[0] == 1 and self.shape[1] == 1: - # NAXIS=4: [STOKES=1, FREQ=1, Y, X] - image = self.data[0, 0, :, :].copy() - else: - raise ValueError("invalid data shape: {1}".format(self.shape)) - return image - - @image.setter - def image(self, value): - if self.ndim == 2: - # NAXIS=2: [Y, X] - self.data = np.array(value) - elif self.ndim == 3: - # NAXIS=3: [FREQ=1, Y, X] - self.data = np.array(value)[np.newaxis, :, :] - else: - # NAXIS=4: [STOKES=1, FREQ=1, Y, X] - self.data = np.array(value)[np.newaxis, np.newaxis, :, :] - - @property - def pixelsize(self): - """ - Image pixel size, in units of [arcsec] - """ - if hasattr(self, "_pixelsize"): - return self._pixelsize - - try: - return self.header["PixSize"] # [arcsec] - except KeyError: - try: - return abs(self.header["CDELT1"]) * 3600 # [deg] -> [arcsec] - except KeyError: - return None - - @pixelsize.setter - def pixelsize(self, value): - # Unit: [arcsec] - oldvalue = self.pixelsize - self._pixelsize = value - # Update header - self.header["PixSize"] = value # [arcsec] - try: - self.header["CDELT1"] *= value / oldvalue - self.header["CDELT2"] *= value / oldvalue - except KeyError: - pass - - @property - def fov(self): - """ - Image FoV coverage: (fov_x, fov_y) - Unit: [deg] - """ - pixelsize = self.pixelsize - if pixelsize: - return (self.Nx*pixelsize/3600, self.Ny*pixelsize/3600) - else: - return None - - def zoom(self, newsize, order=1): - """ - Zoom the image to the specified ``newsize``, meanwhile the header - information will be updated accordingly to preserve the FoV coverage. - - NOTE - ---- - The image aspect ratio cannot be changed. - - Parameters - ---------- - newsize : (Nx, Ny) or N - The size of the zoomed image. - order : int, optional - The interpolation order, default: 1 - """ - try: - Nx2, Ny2 = newsize - except TypeError: - Nx2 = Ny2 = newsize - zoom = ((Ny2+0.1)/self.Ny, (Nx2+0.1)/self.Nx) - if abs(zoom[0] - zoom[1]) > 1e-3: - raise RuntimeError("image aspect ratio cannot be changed") - - pixelsize_old = self.pixelsize - self.image = ndimage.zoom(self.image, zoom=zoom, order=order) - self.pixelsize = pixelsize_old * (self.Nx / Nx2) - return self.image - - def flip(self, direction): - if direction == "lr": - self.image = np.fliplr(self.image) - elif direction == "ud": - self.image = np.flipud(self.image) - else: - raise ValueError("invalid flip direction: %s" % direction) - return self.image - - def rotate(self, to): - if to == "left": - self.image = np.rot90(self.image, k=-1) - elif to == "right": - self.image = np.rot90(self.image, k=1) - elif to == "180": - self.image = np.rot90(self.image, k=2) - else: - raise ValueError("invalid rotate to: %s" % to) - return self.image - - def write(self, outfile, clobber=False): - self.header.add_history(" ".join(sys.argv)) - hdu = fits.PrimaryHDU(data=self.data, header=self.header) - try: - hdu.writeto(outfile, overwrite=clobber) - except TypeError: - hdu.writeto(outfile, clobber=clobber) - - -def show_info(filename, abs_=None, center=None): - """ - Show FITS image information. - """ - fimage = FITSImage(filename) - print("Image data shape: {0}".format(fimage.shape)) - print("Image size: %dx%d" % (fimage.Nx, fimage.Ny)) - print("Data unit: [%s]" % fimage.bunit) - pixelsize = fimage.pixelsize - if pixelsize: - print("Pixel size: %.1f [arcsec]" % pixelsize) - print("Field of view: (%.2f, %.2f) [deg]" % fimage.fov) - data = fimage.image - if abs_: - data = np.abs(data) - if center: - print("Central box size: %d" % center) - rows, cols = data.shape - rc, cc = rows//2, cols//2 - cs1, cs2 = center//2, (center+1)//2 - data = data[(rc-cs1):(rc+cs2), (cc-cs1):(cc+cs2)] - min_ = np.nanmin(data) - max_ = np.nanmax(data) - mean = np.nanmean(data) - median = np.nanmedian(data) - std = np.nanstd(data) - iqr = np.diff(np.nanpercentile(data, q=(25, 75))) - mad = np.nanmedian(np.abs(data - median)) - rms = np.sqrt(np.nanmean(data**2)) - print("min: %13.6e" % min_) - print("max: %13.6e" % max_) - print("range: %13.6e (max - min)" % (max_ - min_)) - print("mean: %13.6e" % mean) - print("median: %13.6e" % median) - print("std: %13.6e (standard deviation)" % std) - print("iqr: %13.6e (interquartile range)" % iqr) - print("mad: %13.6e (median absolute deviation)" % mad) - print("rms: %13.6e (root-mean-squared)" % rms) - - -def cmd_info(args): - """ - Sub-command: "info", show FITS image information - """ - for fn in args.files: - print(">>> %s <<<" % fn) - show_info(fn, abs_=args.abs, center=args.center) - print("") - - -def cmd_add(args): - """ - Sub-command: "add", add the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Add by number: %g" % args.number) - image += args.number - else: - for fn in args.files: - print("Add by another image from: %s" % fn) - fimage2 = FITSImage(fn) - image += fimage2.image - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_sub(args): - """ - Sub-command: "sub", subtract the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Subtract by number: %g" % args.number) - image -= args.number - else: - for fn in args.files: - print("Subtract by another image from: %s" % fn) - fimage2 = FITSImage(fn) - image -= fimage2.image - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_mul(args): - """ - Sub-command: "mul", multiply the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Multiply by number: %g" % args.number) - image *= args.number - else: - for fn in args.files: - print("Multiply by another image from: %s" % fn) - fimage2 = FITSImage(fn) - image *= fimage2.image - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_div(args): - """ - Sub-command: "div", divide the image by a number or other image(s) - """ - fimage = FITSImage(args.infile) - image = fimage.image - if args.number: - print("Divide by number: %g" % args.number) - image /= args.number - else: - for fn in args.files: - print("Divide by another image from: %s" % fn) - fimage2 = FITSImage(fn) - with np.errstate(divide="warn"): - image /= fimage2.image - - if args.fill_value: - print("Filling invalid data with: %s" % args.fill_value) - image[~np.isfinite(image)] = float(args.fill_value) - fimage.image = image - fimage.write(args.outfile, clobber=args.clobber) - print("Saved FITS image to: %s" % args.outfile) - - -def cmd_zoom(args): - """ - Sub-command: "zoom", zoom the image to a new size with FoV coverage - preserved. - """ - fimage = FITSImage(args.infile) - print("Image size: %dx%d" % (fimage.Nx, fimage.Ny)) - pixelsize = fimage.pixelsize - if pixelsize is None: - raise RuntimeError("--pixelsize required") - else: - print("Pixel size: %.1f [arcsec]" % pixelsize) - print("Field of view: (%.2f, %.2f) [deg]" % fimage.fov) - - print("Zooming image ...") - print("Interpolation order: %d" % args.order) - print("Zoomed image size: %dx%d" % (args.size, args.size)) - fimage.zoom(newsize=args.size, order=args.order) - print("Zoomed image pixel size: %.1f [arcsec]" % fimage.pixelsize) - fimage.write(args.outfile, clobber=args.clobber) - print("Saved zoomed FITS image to: %s" % args.outfile) - - -def cmd_flip(args): - """ - Sub-command: "flip", flip the image left-right or up-down. - """ - fimage = FITSImage(args.infile) - print("Flipping image ...") - direction = "lr" if args.lr else "ud" - print("Flip direction: %s" % direction) - fimage.flip(direction) - fimage.write(args.outfile, clobber=args.clobber) - print("Saved flipped FITS image to: %s" % args.outfile) - - -def cmd_rotate(args): - """ - Sub-command: "rotate", rotate the image. - """ - fimage = FITSImage(args.infile) - print("Rotating image ...") - if args.left: - to = "left" - elif args.right: - to = "right" - else: - to = "180" - print("Rotate to: %s" % to) - fimage.rotate(to) - fimage.write(args.outfile, clobber=args.clobber) - print("Saved rotated FITS image to: %s" % args.outfile) - - -def main(): - parser = argparse.ArgumentParser( - description="FITS image manipulation tool") - subparsers = parser.add_subparsers(dest="subparser_name", - title="sub-commands", - help="additional help") - - # sub-command: "info" - parser_info = subparsers.add_parser( - "info", aliases=["show"], - help="show FITS image info") - parser_info.add_argument("-c", "--center", dest="center", type=int, - help="choose central region of specified size") - parser_info.add_argument("-a", "--abs", dest="abs", action="store_true", - help="take absolute values of image pixels") - parser_info.add_argument("files", nargs="+", help="FITS image filename") - parser_info.set_defaults(func=cmd_info) - - # sub-command: "add" - parser_add = subparsers.add_parser( - "add", - help="add the image by a number or other image(s)") - parser_add.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_add.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_add.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_add = parser_add.add_mutually_exclusive_group(required=True) - exgrp_add.add_argument("-n", "--number", dest="number", type=float, - help="number to be added by") - exgrp_add.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be added by") - parser_add.set_defaults(func=cmd_add) - - # sub-command: "sub" - parser_sub = subparsers.add_parser( - "sub", aliases=["subtract"], - help="subtract the image by a number or other image(s)") - parser_sub.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_sub.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_sub.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_sub = parser_sub.add_mutually_exclusive_group(required=True) - exgrp_sub.add_argument("-n", "--number", dest="number", type=float, - help="number to be subtracted by") - exgrp_sub.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be subtracted by") - parser_sub.set_defaults(func=cmd_sub) - - # sub-command: "mul" - parser_mul = subparsers.add_parser( - "mul", aliases=["multiply"], - help="multiply the image by a number or other image(s)") - parser_mul.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_mul.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_mul.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_mul = parser_mul.add_mutually_exclusive_group(required=True) - exgrp_mul.add_argument("-n", "--number", dest="number", type=float, - help="number to be multiplied by") - exgrp_mul.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be multiplied by") - parser_mul.set_defaults(func=cmd_mul) - - # sub-command: "div" - parser_div = subparsers.add_parser( - "div", aliases=["divide"], - help="divide the image by a number or other image(s)") - parser_div.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_div.add_argument("-F", "--fill-value", dest="fill_value", - help="value to fill the invalid elements") - parser_div.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_div.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output FITS image") - exgrp_div = parser_div.add_mutually_exclusive_group(required=True) - exgrp_div.add_argument("-n", "--number", dest="number", type=float, - help="number to be divided by") - exgrp_div.add_argument("-f", "--files", dest="files", nargs="+", - help="FITS image(s) to be divided by") - parser_div.set_defaults(func=cmd_div) - - # sub-command: "zoom" - parser_zoom = subparsers.add_parser( - "zoom", aliases=["rescale"], - help="zoom the image to a new size with FoV coverage preserved") - parser_zoom.add_argument("-C", "--clobber", dest="clobber", - action="store_true", - help="overwrite existing output file") - parser_zoom.add_argument("--order", type=int, default=1, - help="zoom interpolation order (default: 1)") - parser_zoom.add_argument("-s", "--size", type=int, required=True, - help="zoomed image size (number of pixels)") - parser_zoom.add_argument("-p", "--pixelsize", type=float, - help="input FITS image pixel size [arcsec] " + - "(default: try to obtain from FITS header)") - parser_zoom.add_argument("-i", "--infile", dest="infile", required=True, - help="input FITS image") - parser_zoom.add_argument("-o", "--outfile", dest="outfile", required=True, - help="output zoomed FITS image") - parser_zoom.set_defaults(func=cmd_zoom) - - # sub-command: "flip" - parser_flip = subparsers.add_parser( - "flip", help="flip the image left-right or up-down") - parser_flip.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_flip.add_argument("-i", "--infile", required=True, - help="input FITS image") - parser_flip.add_argument("-o", "--outfile", required=True, - help="output flipped FITS image") - exgrp_flip = parser_flip.add_mutually_exclusive_group(required=True) - exgrp_flip.add_argument("-l", "--left-right", dest="lr", - action="store_true", - help="flip in the left/right direction") - exgrp_flip.add_argument("-u", "--up-down", dest="ud", - action="store_true", - help="flip in the left/right direction") - parser_flip.set_defaults(func=cmd_flip) - - # sub-command: "rotate" - parser_rot = subparsers.add_parser( - "rot", aliases=["rotate"], - help="rotate the image") - parser_rot.add_argument("-C", "--clobber", action="store_true", - help="overwrite existing output file") - parser_rot.add_argument("-i", "--infile", required=True, - help="input FITS image") - parser_rot.add_argument("-o", "--outfile", required=True, - help="output rotated FITS image") - exgrp_rot = parser_rot.add_mutually_exclusive_group(required=True) - exgrp_rot.add_argument("-l", "--left", action="store_true", - help="rotate left") - exgrp_rot.add_argument("-r", "--right", action="store_true", - help="rotate right") - exgrp_rot.add_argument("-u", "--180", dest="ud", - action="store_true", - help="rotate 180 degree") - parser_rot.set_defaults(func=cmd_rotate) - - args = parser.parse_args() - args.func(args) - - -if __name__ == "__main__": - main() |