1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
#!/usr/bin/env python3
#
# Copyright (c) 2018 Aaron LI <aly@aaronly.me>
# MIT license
#
"""
Calculate the spectrograms of the FITS cube.
If the discrete Fourier transform (DFT/FFT) performed, output the real,
imaginary, and magnitude spectrograms. If the discrete cosine transform
(DCT) performed, then output only the magnitude spectrogram since it's
real numbered.
The transform is applied along the frequency axis, while the two spatial
dimensions are flattened.
"""
import sys
import argparse
import logging
import numpy as np
from scipy import signal
from scipy import fftpack
from astropy.io import fits
logging.basicConfig(level=logging.INFO,
format="[%(levelname)s:%(lineno)d] %(message)s")
logger = logging.getLogger()
class Spectrogram:
def __init__(self, cube):
cube = np.array(cube, dtype=float) # [Freq, Y, X]
nfreq, ny, nx = cube.shape
self.data = cube.reshape((nfreq, ny*nx))
logger.info('Cube: %d frequencies, %d pixels' % self.data.shape)
@property
def nfreq(self):
return self.data.shape[0]
@property
def npix(self):
return self.data.shape[1]
def set_window(self, window):
if window.upper() == 'NONE':
return
logger.info('Generating %s window ...' % window)
wfunc = getattr(signal.windows, window)
w = wfunc(self.nfreq, sym=False)
logger.info('Applying window ...')
self.data *= w[:, np.newaxis]
def calc_cosine(self):
logger.info('Calculating DCT ...')
return fftpack.dct(self.data, axis=0, norm='ortho')
def calc_fourier(self):
logger.info('Calculating FFT ...')
z = fftpack.fftshift(fftpack.fft(self.data, axis=0), axes=0)
return (np.abs(z), np.real(z), np.imag(z))
def main():
parser = argparse.ArgumentParser(
description='Calculate spectrogram of an image cube')
parser.add_argument('-C', '--clobber', action='store_true')
parser.add_argument('-w', '--window', required=True,
choices=['none', 'nuttall', 'hanning'],
help='window function')
parser.add_argument('-t', '--type', required=True,
choices=['cosine', 'fourier'],
help='spectrogram type')
parser.add_argument('-i', '--infile', required=True,
help='input image cube')
parser.add_argument('-M', '--out-mag', dest='outmag', required=True,
help='output magnitude spectrogram')
parser.add_argument('-R', '--out-real', dest='outreal',
help='output real spectrogram (type "fourier")')
parser.add_argument('-I', '--out-imag', dest='outimag',
help='output imaginary spectrogram (type "fourier")')
args = parser.parse_args()
if args.type == 'fourier':
assert (args.outreal and args.outimag), '--real and --imag required'
cube = fits.open(args.infile)[0].data
logger.info("Cube shape: %dx%dx%d" % cube.shape)
header = fits.Header()
header.add_history(' '.join(sys.argv))
header['SP_TYPE'] = (args.type, 'cosine (DCT) or fourier (FFT)')
spectrogram = Spectrogram(cube)
spectrogram.set_window(args.window)
header['SP_WIND'] = (args.window, 'window function along frequency')
if args.type == 'cosine':
sp_mag = spectrogram.calc_cosine()
header['SP_DATA'] = 'magnitude'
fits.PrimaryHDU(data=sp_mag, header=header).writeto(
args.outmag, overwrite=args.clobber)
logger.info('Wrote magnitude spectrogram to: %s' % args.outmag)
else:
sp_mag, sp_real, sp_imag = spectrogram.calc_fourier()
header['SP_DATA'] = 'magnitude'
fits.PrimaryHDU(data=sp_mag, header=header).writeto(
args.outmag, overwrite=args.clobber)
logger.info('Wrote magnitude spectrogram to: %s' % args.outmag)
header['SP_DATA'] = 'real'
fits.PrimaryHDU(data=sp_real, header=header).writeto(
args.outreal, overwrite=args.clobber)
logger.info('Wrote real spectrogram to: %s' % args.outreal)
header['SP_DATA'] = 'imaginary'
fits.PrimaryHDU(data=sp_imag, header=header).writeto(
args.outimag, overwrite=args.clobber)
logger.info('Wrote imaginary spectrogram to: %s' % args.outimag)
if __name__ == '__main__':
main()
|