aboutsummaryrefslogtreecommitdiffstats
path: root/astro/oskar
diff options
context:
space:
mode:
authorAaron LI <aly@aaronly.me>2017-08-22 15:50:59 +0800
committerAaron LI <aly@aaronly.me>2017-08-22 15:50:59 +0800
commit5578504506edfd121fd80cbc50c32a462504cd48 (patch)
treee4a866aa4f0c02cc8eede1800b71a1e7b6bbfa3e /astro/oskar
parentd4b05a8233310bb0c7cd319f2a30cd91142a5671 (diff)
downloadatoolbox-5578504506edfd121fd80cbc50c32a462504cd48.tar.bz2
Add oskar/taper_sky.py: Taper sky image with Tukey window
Diffstat (limited to 'astro/oskar')
-rwxr-xr-xastro/oskar/taper_sky.py198
1 files changed, 198 insertions, 0 deletions
diff --git a/astro/oskar/taper_sky.py b/astro/oskar/taper_sky.py
new file mode 100755
index 0000000..04a9177
--- /dev/null
+++ b/astro/oskar/taper_sky.py
@@ -0,0 +1,198 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2017 Weitian LI <weitian@aaronly.me>
+# MIT License
+#
+
+"""
+Taper the sky image (input of OSKAR simulation) to mitigate the
+side lobes effects, which causes trouble in creating good images.
+
+The circular Tukey window is adopted, which is also built in by
+e.g., WSClean.
+"""
+
+import os
+import sys
+import argparse
+import logging
+from datetime import datetime, timezone
+
+import numpy as np
+from scipy import interpolate
+import astropy.io.fits as fits
+
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(os.path.basename(sys.argv[0]))
+
+
+def make2d(w1d, x=None):
+ """
+ Create 2D filter from the 1D one.
+
+ Parameters
+ ----------
+ w1d : 1D `~numpy.ndarray`
+ The input 1D filter/window
+ x : 1D `~numpy.ndarray`, optional
+ The X-axis values of the input ``w1d`` filter
+
+ Returns
+ -------
+ w2d : 2D `~numpy.ndarray`
+ Created 2D filter/window from the input 1D one.
+
+ Credit
+ ------
+ [1] MATLAB - creating 2D convolution filters
+ https://cn.mathworks.com/matlabcentral/newsreader/view_thread/23588
+ """
+ if x is None:
+ L = len(w1d)
+ M = (L-1) / 2
+ x = np.linspace(-M, M, num=L)
+
+ xmax = np.max(x)
+ xsize = int(2 * xmax + 1)
+ xx = np.linspace(-xmax, xmax, num=xsize)
+ xg, yg = np.meshgrid(xx, xx)
+ r = np.sqrt(xg**2 + yg**2)
+ ridx = (r <= xmax)
+ w2d = np.zeros(shape=(xsize, xsize))
+ finterp = interpolate.interp1d(x=x, y=w1d, kind="linear")
+ w2d[ridx] = finterp(r[ridx])
+
+ return w2d
+
+
+def tukey(L, Minner=0.5, Mouter=0.5, Lmin=0):
+ """
+ The 1D Tukey filter/window
+
+ ^
+ | Lmin | Minner*L | constant ones | Mouter*L |
+ +--------------------------------------------. (L)
+
+ Parameters
+ ----------
+ L : int
+ The length (number of points) of the window/filter
+ Minner : float
+ The fraction of ``L`` for the inner (left), increasing part
+ Mouter : float
+ The fraction of ``L`` for the outer (left), decreasing part
+ Lmin : int
+ The beginning point for the inner (left) part
+
+ References
+ ----------
+ * MATLAB - Tukey (tapered cosine) window
+ https://cn.mathworks.com/help/signal/ref/tukeywin.html
+ * WSClean - Weight tapering
+ https://sourceforge.net/p/wsclean/wiki/Tapering/
+ """
+ w = np.zeros(L)
+ Linner = int(Minner * L)
+ Louter = int(Mouter * L)
+
+ Lones = L - Lmin - Linner - Louter
+ if Lones < 0:
+ raise RuntimeError("invalid parameters combination")
+ w[(Lmin+Linner):(Lmin+Linner+Lones)] = 1.0
+
+ if Linner > 0:
+ xinner = np.arange(0, Linner)
+ x2 = (np.pi/Linner) * (xinner - Linner)
+ w[Lmin:(Lmin+Linner)] = 0.5 + 0.5 * np.cos(x2)
+
+ if Louter > 0:
+ xouter = np.arange(0, Louter)
+ x2 = (np.pi/Louter) * xouter
+ w[(Lmin+Linner+Lones):] = 0.5 + 0.5 * np.cos(x2)
+
+ return w
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Taper sky image with Tukey window")
+ parser.add_argument("-C", "--clobber", dest="clobber",
+ action="store_true",
+ help="overwrite existing file")
+ parser.add_argument("-r", "--window-inner", dest="Rinner",
+ type=int, required=True,
+ help="inner radius [pixel] of the tukey window")
+ parser.add_argument("-R", "--window-outer", dest="Router",
+ type=int, required=True,
+ help="outer radius [pixel] of the tukey window")
+ parser.add_argument("-T", "--outfile-taper", dest="outfile_taper",
+ help="save the applied taper into a FITS image")
+ parser.add_argument("infile", help="input FITS sky image")
+ parser.add_argument("outfile", help="output tapered sky image")
+ args = parser.parse_args()
+
+ if os.path.exists(args.outfile):
+ if args.clobber:
+ logger.warning("Removed existing file: %s" % args.outfile)
+ os.remove(args.outfile)
+ else:
+ raise OSError("Output file already exists: %s" % args.outfile)
+ if os.path.exists(args.outfile_taper):
+ if args.clobber:
+ logger.warning("Removed existing file: %s" % args.outfile_taper)
+ os.remove(args.outfile_taper)
+ else:
+ raise OSError("Output file already exists: %s" %
+ args.outfile_taper)
+
+ with fits.open(args.infile) as f:
+ image = f[0].data
+ header = f[0].header
+ logger.info("Read sky image from file: %s" % args.infile)
+
+ L = args.Router
+ Louter = L - args.Rinner
+ Mouter = Louter / L
+ logger.info("Create Tukey window: Rin=%d, Rout=%d" %
+ (args.Rinner, args.Router))
+ w1d = tukey(L, Minner=0, Mouter=Mouter)
+ x1d = np.arange(len(w1d))
+ logger.info("Make 2D Tukey window")
+ w2d = make2d(w1d, x1d)
+
+ xw = w2d.shape[0]
+ ximg = image.shape[0]
+ if xw == ximg:
+ taper = w2d
+ elif xw > ximg:
+ i1 = int((xw-ximg) / 2)
+ i2 = int((xw+ximg) / 2)
+ taper = w2d[i1:i2, i1:i2]
+ else:
+ taper = np.zeros(shape=image.shape)
+ i1 = int((ximg-xw) / 2)
+ i2 = int((ximg+xw) / 2)
+ taper[i1:i2, i1:i2] = w2d
+
+ logger.info("Taper the sky image ...")
+ image *= taper
+
+ header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
+ "File creation date")
+ header.add_history(" ".join(sys.argv))
+ hdu = fits.PrimaryHDU(data=image, header=header)
+ hdu.writeto(args.outfile)
+ logger.info("Wrote tapered sky image to file: %s" % args.outfile)
+
+ if args.outfile_taper:
+ header["OBJECT"] = ("Tukey Window", "Taper window")
+ header["Rinner"] = (args.Rinner, "[pixel] inner radius")
+ header["Router"] = (args.Router, "[pixel] outer radius")
+ hdu = fits.PrimaryHDU(data=taper, header=header)
+ hdu.writeto(args.outfile_taper)
+ logger.info("Wrote tapered to file: %s" % args.outfile_taper)
+
+
+if __name__ == "__main__":
+ main()