import datetime
import glob
import os
import time
import warnings
import numpy as np
import astropy.units as u
import paircars.utils.hyperbeam as mwa_hyperbeam
from scipy.interpolate import RegularGridInterpolator
from astropy.io import fits
from astropy.time import Time
from astropy.coordinates import EarthLocation, SkyCoord, AltAz
from .basic_utils import get_datadir
from .udocker_utils import run_wsclean
from .resource_utils import limit_threads
warnings.filterwarnings("ignore")
datadir = get_datadir()
MWA_PB_file_paircars = f"{datadir}/mwa_full_embedded_element_pattern.h5"
sweet_spot_file_paircars = f"{datadir}/MWA_sweet_spots.npy"
haslam_map_paircars = f"{datadir}/haslam_map.fits"
MWALON = 116.67
MWALAT = -26.7
MWAALT = 377.8
MWAPOS = EarthLocation.from_geodetic(
lon="116:40:14.93", lat="-26:42:11.95", height=377.8
)
[docs]
def get_azza_from_fits(filename, metafits):
"""
Get azimuith and zenith angle arrays from fits file
Parameters
----------
filename : str
Name of the fits file
metafits : str
Metafits file
Returns
-------
dict
{'za_rad': theta,'astro_az_rad': phi}
"""
import astropy.wcs as pywcs
f = fits.open(filename)
h = f[0].header
f.close()
wcs = pywcs.WCS(h)
naxes = h["NAXIS"]
x = np.arange(1, h["NAXIS1"] + 1)
y = np.arange(1, h["NAXIS2"] + 1)
Y, X = np.meshgrid(y, x)
Xflat = X.flatten()
Yflat = Y.flatten()
FF = np.ones(Xflat.shape)
if naxes >= 4:
Tostack = [Xflat, Yflat, FF]
for i in range(3, naxes):
Tostack.append(np.ones(Xflat.shape))
else:
Tostack = [Xflat, Yflat]
pixcrd = np.vstack(Tostack).transpose()
sky = wcs.wcs_pix2world(pixcrd, 1)
################
# Extract RA-DEC
################
ra = sky[:, 0]
dec = sky[:, 1]
RA = ra.reshape(X.shape)
Dec = dec.reshape(Y.shape)
##########################################
# Get the date so we can convert to Az,El
##########################################
metadata = fits.getheader(metafits)
d = metadata["DATE-OBS"]
if "." in d:
d = d.split(".")[0]
dt = datetime.datetime.strptime(d, "%Y-%m-%dT%H:%M:%S")
mwatime = Time(dt)
#############################
# Transform to Alt-Az
#############################
source = SkyCoord(ra=RA, dec=Dec, frame="icrs", unit=(u.deg, u.deg))
source.location = MWAPOS
source.obstime = mwatime
time.time()
source_altaz = source.transform_to("altaz")
Alt, Az = (
source_altaz.alt.deg,
source_altaz.az.deg,
)
theta = (90 - Alt) * np.pi / 180
phi = Az * np.pi / 180
return {"za_rad": theta.transpose(), "astro_az_rad": phi.transpose()}
[docs]
def get_IQUV(filename, stokesaxis=4):
"""Get IQUV from a fits file"""
data = fits.getdata(filename)
stokes = {}
if stokesaxis == 3:
stokes["I"] = data[0, 0, :, :]
stokes["Q"] = data[0, 1, :, :]
stokes["U"] = data[0, 2, :, :]
stokes["V"] = data[0, 3, :, :]
elif stokesaxis == 4:
stokes["I"] = data[0, 0, :, :]
stokes["Q"] = data[1, 0, :, :]
stokes["U"] = data[2, 0, :, :]
stokes["V"] = data[3, 0, :, :]
else:
stokes["I"] = data[0, 0, :, :]
stokes["Q"] = data[0, 0, :, :] * 0
stokes["U"] = data[0, 0, :, :] * 0
stokes["V"] = data[0, 0, :, :] * 0
return stokes
[docs]
def get_inst_pols(stokes, iau_order=True):
"""Return instrumental polaristations matrix (Vij)"""
if iau_order:
XX = stokes["I"] + stokes["Q"]
XY = stokes["U"] + stokes["V"] * 1j
YX = stokes["U"] - stokes["V"] * 1j
YY = stokes["I"] - stokes["Q"]
else:
XX = stokes["I"] - stokes["Q"]
XY = stokes["U"] - stokes["V"] * 1j
YX = stokes["U"] + stokes["V"] * 1j
YY = stokes["I"] + stokes["Q"]
Vij = np.array([[XX, XY], [YX, YY]])
Vij = np.swapaxes(np.swapaxes(Vij, 0, 2), 1, 3)
return Vij
[docs]
def B2IQUV(B, iau_order=False):
"""
Convert sky brightness matrix to I, Q, U, V
Parameters
----------
B : numpy.array
Brightness matrix array
iau_order : bool, optional
Whether brightness matrix is in IAU or MWA convention
Returns
-------
dict
Stokes dictionary
"""
B11 = B[:, :, 0, 0]
B12 = B[:, :, 0, 1]
B21 = B[:, :, 1, 0]
B22 = B[:, :, 1, 1]
if iau_order:
stokes = {}
stokes["I"] = (B11 + B22) / 2.0
stokes["Q"] = (B11 - B22) / 2.0
stokes["U"] = (B12 + B21) / 2.0
stokes["V"] = 1j * (B21 - B12) / 2.0
else:
stokes = {}
stokes["I"] = (B11 + B22) / 2.0
stokes["Q"] = (B22 - B11) / 2.0
stokes["U"] = (B21 + B12) / 2.0
stokes["V"] = 1j * (B12 - B21) / 2.0
return stokes
[docs]
def all_sky_beam_interpolator(
sweet_spot_num,
freq,
resolution,
ncpu=-1,
MWA_PB_file="",
sweet_spot_file="",
iau_order=False,
):
"""
Calculate all sky beam interpolation for given sweet spot pointing
Parameters
----------
sweet_spot_num : int
Sweet spot number
freq : float
Frequency in MHz
resolution : float
Spatial resolution in degree
ncpu : int, optional
Number of CPU threads to use
MWA_PB_file : str, optional
MWA primary beam file
sweet_spot_file : str, optional
MWA sweet spot file name
iau_order : bool, optional
PB Jones in IAU order or not
Returns
-------
numpy.array
All sky primary beam Jones array
"""
ncpu = max(1, ncpu)
limit_threads(n_threads=ncpu)
from scipy.interpolate import RectBivariateSpline
if MWA_PB_file == "" or os.path.exists(MWA_PB_file) is False:
MWA_PB_file = MWA_PB_file_paircars
if sweet_spot_file == "" or os.path.exists(sweet_spot_file) is False:
sweet_spot_file = sweet_spot_file_paircars
beam = mwa_hyperbeam.FEEBeam(MWA_PB_file)
az_scale = np.arange(0, 360, resolution)
alt_scale = np.arange(0, 90, resolution)
az, alt = np.meshgrid(az_scale, alt_scale)
za_rad = np.deg2rad(90 - alt.ravel()) # Zenith angle in radian
az_rad = np.deg2rad(az.ravel()) # Azimuth in radian
sweet_spots = np.load(sweet_spot_file, allow_pickle=True).all()
delay = sweet_spots[int(sweet_spot_num)][-1]
##############################################
# Calculating Jones array in 1 deg alt-az grid
##############################################
jones = beam.calc_jones_array(
az_rad,
za_rad,
freq * 10**6,
delay,
[1] * 16,
True,
np.deg2rad(MWALAT),
iau_order,
)
jones = jones.swapaxes(0, 1).reshape(4, alt_scale.shape[0], az_scale.shape[0])
j00_r = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.real(jones[0, ...]))
)
j00_i = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.imag(jones[0, ...]))
)
j01_r = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.real(jones[1, ...]))
)
j01_i = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.imag(jones[1, ...]))
)
j10_r = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.real(jones[2, ...]))
)
j10_i = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.imag(jones[2, ...]))
)
j11_r = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.real(jones[3, ...]))
)
j11_i = RectBivariateSpline(
x=alt_scale, y=az_scale, z=np.nan_to_num(np.imag(jones[3, ...]))
)
return j00_r, j00_i, j01_r, j01_i, j10_r, j10_i, j11_r, j11_i
[docs]
def get_coarse_resolution(freq):
"""
Get coarse sptial resolution based on frequency
Parameters
----------
freq : float
Frequency in MHz
Returns
-------
float
Spatial resolution in degree
"""
wavelength = 299792458.0 / (freq * 10**6)
dish_dia = 16.0
FOV = 1.22 * wavelength / dish_dia
fov_deg = np.rad2deg(FOV)
resolution = max(0.25, round(fov_deg / 20, 2))
return resolution
[docs]
def get_jones_array(
alt_arr,
az_arr,
freq,
gridpoint,
ncpu=-1,
interpolated=True,
MWA_PB_file="",
sweet_spot_file="",
iau_order=False,
):
"""
Get primary beam jones matrix
Parameters
----------
alt_arr : numpy.array
Flattened altitude array in degrees
az_arr : numpy.array
Flattened azimuth array in degrees
freq : float
Frequency in MHz
gridpoint : int
Gridpoint number
ncpu : int, optional
Number of CPU threads to use
interpolated : bool, optional
Use spatially interpolated beams or not
MWA_PB_file : str, optional
Primary beam file name
sweet_spot_file : str, optional
MWA sweet spot file name
iau_order : bool, optional
IAU order of the beam
Returns
-------
numpy.array
Jones array (shape : coordinate_arr_shape, 2 ,2)
"""
ncpu = max(1, ncpu)
limit_threads(n_threads=ncpu)
from joblib import Parallel, delayed as jobdelayed
if MWA_PB_file == "" or os.path.exists(MWA_PB_file) is False:
MWA_PB_file = MWA_PB_file_paircars
if sweet_spot_file == "" or os.path.exists(sweet_spot_file) is False:
sweet_spot_file = sweet_spot_file_paircars
# Change resolution based on frequency
coarse_resolution = get_coarse_resolution(freq)
if interpolated:
j00_r, j00_i, j01_r, j01_i, j10_r, j10_i, j11_r, j11_i = (
all_sky_beam_interpolator(
int(gridpoint),
freq,
coarse_resolution,
ncpu=ncpu,
MWA_PB_file=MWA_PB_file,
sweet_spot_file=sweet_spot_file,
iau_order=iau_order,
)
)
with Parallel(n_jobs=ncpu, backend="multiprocessing") as parallel:
results = parallel(
[
jobdelayed(j00_r)(alt_arr, az_arr, grid=False),
jobdelayed(j00_i)(alt_arr, az_arr, grid=False),
jobdelayed(j01_r)(alt_arr, az_arr, grid=False),
jobdelayed(j01_i)(alt_arr, az_arr, grid=False),
jobdelayed(j10_r)(alt_arr, az_arr, grid=False),
jobdelayed(j10_i)(alt_arr, az_arr, grid=False),
jobdelayed(j11_r)(alt_arr, az_arr, grid=False),
jobdelayed(j11_i)(alt_arr, az_arr, grid=False),
]
)
del parallel
(
j00_r_arr,
j00_i_arr,
j01_r_arr,
j01_i_arr,
j10_r_arr,
j10_i_arr,
j11_r_arr,
j11_i_arr,
) = results
j00 = j00_r_arr + 1j * j00_i_arr
j01 = j01_r_arr + 1j * j01_i_arr
j10 = j10_r_arr + 1j * j10_i_arr
j11 = j11_r_arr + 1j * j11_i_arr
j00 = j00.reshape(az_arr.shape)
j01 = j01.reshape(az_arr.shape)
j10 = j10.reshape(az_arr.shape)
j11 = j11.reshape(az_arr.shape)
jones_array = np.array([j00, j01, j10, j11]).T
else:
beam = mwa_hyperbeam.FEEBeam(MWA_PB_file)
sweet_spots = np.load(sweet_spot_file, allow_pickle=True).all()
delay = sweet_spots[int(gridpoint)][-1]
za_arr = 90 - alt_arr
jones_array = beam.calc_jones_array(
np.deg2rad(az_arr),
np.deg2rad(za_arr),
freq * 10**6,
delay,
[1] * 16,
True,
np.deg2rad(MWALAT),
iau_order,
)
jones_array = jones_array.reshape(jones_array.shape[0], 2, 2)
return jones_array
[docs]
def get_pb_radec(
ra,
dec,
freq,
metafits,
ncpu=-1,
MWA_PB_file="",
sweet_spot_file="",
iau_order=False,
):
"""
Function to get MWA primary beam at specific RA, DEC
Parameters
----------
ra : str
RA either in degree or 'hh:mm:ss' or '%fh%fm%fs' format
dec : str
DEC either in degree or 'dd:mm:ss' or '%fd%fm%fs'format
freq : float
Frequency in MHz
metafits : str
MWA metafits file
ncpu : int, optional
Number of CPU threads
MWA_PB_file : str, optional
MWA primary beam file path
sweet_spot_file : str, optional
Sweetspot file name
iau_order : bool, optional
Beam Jones in IAU order or not
Returns
-------
int
Success message (0 or 1)
np.array
Jones matrix
float
Stokes I beam value
float
XX power beam value
float
YY power beam value
"""
ncpu = max(1, ncpu)
limit_threads(n_threads=ncpu)
if MWA_PB_file == "" or os.path.exists(MWA_PB_file) is False:
MWA_PB_file = MWA_PB_file_paircars
if sweet_spot_file == "" or os.path.exists(sweet_spot_file) is False:
sweet_spot_file = sweet_spot_file_paircars
beam = mwa_hyperbeam.FEEBeam(MWA_PB_file)
metadata = fits.getheader(metafits)
obstime = metadata["DATE-OBS"]
gridpoint = metadata["GRIDNUM"]
sweet_spots = np.load(sweet_spot_file, allow_pickle=True).all()
delay = sweet_spots[int(gridpoint)][-1]
observing_time = Time(obstime)
aa = AltAz(location=MWAPOS, obstime=observing_time)
try:
ra = float(ra)
dec = float(dec)
coord = SkyCoord(ra, dec, frame="icrs", unit="deg")
except Exception:
try:
coord = SkyCoord(ra, dec)
except Exception:
coord = SkyCoord(ra, dec, unit=(u.hourangle, u.deg))
altaz_object = coord.transform_to(aa)
alt = altaz_object.alt.degree
az = altaz_object.az.degree
# Decide whether we have arrays or scalars
is_array = isinstance(ra, np.ndarray) and isinstance(dec, np.ndarray)
# Compute Jones matrices
if is_array:
jones = beam.calc_jones_array(
np.deg2rad(az),
np.deg2rad(90 - alt),
freq * 1e6,
delay,
[1] * 16,
True,
np.deg2rad(MWALAT),
iau_order,
)
else:
jones = beam.calc_jones(
np.deg2rad(az),
np.deg2rad(90 - alt),
freq * 1e6,
delay,
[1] * 16,
True,
np.deg2rad(MWALAT),
iau_order,
)
# Promote scalar → (1,4) for uniform handling
jones = np.asarray(jones)[None, :]
# Jones shape: (N, 4)
J = np.abs(jones) ** 2
stokesI_beam = (J[:, 0] + J[:, 1] + J[:, 2] + J[:, 3]) / 2
power_beam_array_XX = J[:, 0] + J[:, 1]
power_beam_array_YY = J[:, 2] + J[:, 3]
# If scalar input, return scalars
if not is_array:
stokesI_beam = stokesI_beam[0]
power_beam_array_XX = power_beam_array_XX[0]
power_beam_array_YY = power_beam_array_YY[0]
jones = jones[0]
return 0, jones, stokesI_beam, power_beam_array_XX, power_beam_array_YY
[docs]
def get_haslam(freq, scaling=-2.55):
"""
Get the Haslam 408 MHz all sky map extrapolated to desired frequency.
Parameters
----------
freq : float
Frequency in MHz
scaling : float, optional
Power law index for extrapolation (default : -2.55)
Returns
-------
dict
A python dictionary
i) Haslam map at given frequency in 10xK unit
ii) RA in degree
iii) DEC in degree
"""
if not os.path.exists(haslam_map_paircars):
print(f"Could not find 408 MHz image: {haslam_map_paircars}.")
return None
try:
f = fits.open(haslam_map_paircars)
except Exception:
print(f"Error opening 408 MHz image: {haslam_map_paircars}.")
return None
skymap = f[0].data[0] / 10.0 # Haslam map is in 10xK
skymap = skymap * (freq / 408.0) ** scaling # Scale to frequency
RA_1D = f[0].header.get("CRVAL1") + (
np.arange(1, skymap.shape[1] + 1) - f[0].header.get("CRPIX1")
) * f[0].header.get("CDELT1")
dec_1D = f[0].header.get("CRVAL2") + (
np.arange(1, skymap.shape[0] + 1) - f[0].header.get("CRPIX2")
) * f[0].header.get("CDELT2")
return {"skymap": skymap, "RA": RA_1D, "dec": dec_1D} # RA, dec in degs
[docs]
def map_sky_haslam(skymap, RA, dec, az_grid, za_grid, obstime=""):
"""
Reprojects Haslam map onto an input az, ZA grid.
Parameters
----------
skymap : np.array
Haslam map in RA DEC
RA : np.array
1D range of RAs (deg)
dec : np.array
1D range of decs (deg)
az_grid : np.array
Grid of azes onto which we map sky
za_grid : np.array
Grid of ZAs onto which we map sky
obstime : str, optional
Time of the observation in 'yyyy-mm-dd hh:mm:ss' format
Returns
-------
numpy.array
Sky map array
"""
# Get az, ZA grid transformed to equatorial coords
grid2eq = horz2eq(az_grid, za_grid, obstime)
my_interp_fn = RegularGridInterpolator(
(dec, RA[::-1]), skymap[:, ::-1], bounds_error=False, fill_value=np.nan
)
# interpolate map onto az,ZA grid
# Convert to RA=-180 - 180 format (same as Haslam)
# We do it this way so RA values are always increasing for RegularGridInterpolator
grid2eq["RA"][grid2eq["RA"] > 180] = grid2eq["RA"][grid2eq["RA"] > 180] - 360
my_map = my_interp_fn(np.dstack([grid2eq["DEC"], grid2eq["RA"]]))
return my_map
[docs]
def map_sky(skymap, RA, dec, az_grid, za_grid, obstime=""):
"""
Reprojects Haslam map onto an input az, ZA grid.
Parameters
----------
skymap : np.array
Haslam map in RA DEC
RA : np.array
1D range of RAs (deg)
dec : np.array
1D range of decs (deg)
az_grid : np.array
Grid of azes onto which we map sky
za_grid : np.array
Grid of ZAs onto which we map sky
obstime : str, optional
Time of the observation in 'yyyy-mm-dd hh:mm:ss' format
Returns
-------
numpy.array
Sky map
"""
# Get az, ZA grid transformed to equatorial coords
grid2eq = horz2eq(az_grid, za_grid, obstime)
my_interp_fn = RegularGridInterpolator(
(dec, RA[::-1]), skymap[:, ::-1], bounds_error=False, fill_value=np.nan
)
# interpolate map onto az,ZA grid
# We do it this way so RA values are always increasing for RegularGridInterpolator
my_map = my_interp_fn(np.dstack([grid2eq["DEC"], grid2eq["RA"]]))
return my_map
[docs]
def horz2eq(az, ZA, obstime):
"""
Convert from horizontal (az, ZA) to equatorial (RA, dec)grid2eq = horz2eq(az_grid, za_grid, obstime)
Parameters
----------
az : np.array
Grid of azes onto which we map sky
za : np.array
Grid of ZAs onto which we map sky
obstime : str
Time of the observation in 'yyyy-mm-dd hh:mm:ss' format
Returns
-------
dict
A python dictionary {'RA' : degress, 'DEC' : degrees}
"""
import skyfield.api as si
MWA_TOPO = si.Topos(
longitude=(116, 40, 14.93), latitude=(-26, 42, 11.95), elevation_m=377.8
)
skyfield_loader = si.Loader(datadir, verbose=False, expire=True)
PLANETS = skyfield_loader("de421.bsp")
TIMESCALE = skyfield_loader.timescale(builtin=True)
S_MWAPOS = PLANETS["earth"] + MWA_TOPO
observing_time = Time(obstime)
observer = S_MWAPOS.at(TIMESCALE.from_astropy(observing_time))
coords = observer.from_altaz(
alt_degrees=(90 - ZA), az_degrees=az, distance=si.Distance(au=9e90)
)
ra_a, dec_a, _ = coords.radec()
return {"RA": ra_a._degrees, "DEC": dec_a.degrees}
[docs]
def get_image_info(image):
"""
Get the image data and its coordinates
Parameters
----------
image : str
Name of the image
Returns
-------
dict
A python dictionary
i) Map
ii) RA in degree
iii) DEC in degree
"""
if not os.path.exists(image):
print("Could not find 408 MHz image: %s\n" % (image))
return None
try:
f = fits.open(image)
except Exception:
print("Error opening 408 MHz image: %s\n" % (image))
return None
skymap = f[0].data[0, 0, ...]
RA_1D = f[0].header.get("CRVAL1") + (
np.arange(1, skymap.shape[1] + 1) - f[0].header.get("CRPIX1")
) * f[0].header.get(
"CDELT1"
) # /15.0
dec_1D = f[0].header.get("CRVAL2") + (
np.arange(1, skymap.shape[0] + 1) - f[0].header.get("CRPIX2")
) * f[0].header.get("CDELT2")
return {"skymap": skymap, "RA": RA_1D, "dec": dec_1D} # RA, dec in degs
[docs]
def makeAZZA_dOMEGA(npix, projection="SIN"):
"""
Make azimuth and zenith angle arrays for a square image of side npix
Parameters
----------
npix : int
Number of pixels of the grid
projection : str, optional
SIN or ZEA
Returns
-------
list
Azimuth angles in radians
list
Zenith angle in radians
int
Total number of pixels above the horizon
float
Differential solid angle (dOMEGA)
"""
# build az and za arrays
# use linspace to ensure we go to horizon on all sides
z = np.linspace(-npix / 2.0, npix / 2.0, num=npix, dtype=np.float32)
x = np.empty((npix, npix), dtype=np.float32)
y = np.empty((npix, npix), dtype=np.float32)
dOMEGA = np.empty((npix, npix), dtype=np.float32)
for i in range(npix):
y[i, 0:] = z
x[0:, i] = z
d = np.sqrt(x * x + y * y) / (npix / 2)
# only select pixels above horizon
t = d <= 1.0
n_total = t.sum()
dOMEGA.fill(np.pi * 2.00 / n_total)
za = np.zeros((npix, npix), dtype=np.float32) * np.NaN
if projection == "SIN":
za[t] = np.arcsin(d[t])
dOMEGA = np.cos(za) * np.pi * 2.00 / n_total
elif projection == "ZEA":
d = d * 2**0.5 # ZEA requires R to extend beyond 1.
za[t] = 2 * np.arcsin(d[t] / 2.0)
else:
e = "Projection %s not found" % projection
print(e)
raise ValueError(e)
az = np.arctan2(y, x)
az = az + np.pi # 0 to 2pi
az = 2 * np.pi - az # Change to clockwise from top (when origin is in top-left)
return az, za, n_total, dOMEGA
[docs]
def get_fringe(msname, freq, metafits, resolution=1, n_threads=1, baseline=[]):
"""
Function to calculate all sky fringe of a baseline
Parameters
----------
msname : str
Name of the measurement set
freq : float
Frequency in MHz
metafits : str
Name of the metafits file
resolution : float, optional
Beam resolution in degree (default : 1deg)
n_threads : int, optional
Number of cpu threads use for parallel computing
baseline : list, optional
Antenna list of a baseline
Returns
-------
np.array
All-sky fringe array in sky coornidinate
"""
n_threads = max(1, n_threads)
from casatasks import split
try:
msname = msname.rstrip("/")
baseline_str = str(baseline[0]) + "&&" + str(baseline[1])
bs_ms = (
os.path.abspath(msname).split(".ms")[0]
+ "_"
+ str(baseline[0])
+ "_"
+ str(baseline[1])
+ "_"
+ str(freq)
+ ".ms"
)
if os.path.exists(bs_ms):
os.system(f"rm -rf {bs_ms}")
split(
vis=msname,
outputvis=bs_ms,
spw="0:" + str(freq) + "MHz",
antenna=baseline_str,
datacolumn="data",
)
imsize = 512
cellsize = int(110 * 3600 / 512)
imagename_prefix = os.path.abspath(bs_ms).split(".ms")[0] + "_fringe"
wsclean_args = [
"-size " + str(imsize) + " " + str(imsize),
"-scale " + str(cellsize) + "asec",
"-niter 0",
"-make-psf-only",
"-no-fit-beam",
"-pol i",
"-j " + str(n_threads),
"-name " + imagename_prefix,
"-quiet",
]
wsclean_cmd = "wsclean " + " ".join(wsclean_args) + " " + bs_ms
run_wsclean(wsclean_cmd, "paircarswsclean", verbose=False)
wsclean_image_list = glob.glob(f"{imagename_prefix}*psf*.fits")
wsclean_psf = wsclean_image_list[0]
obstime = fits.getheader(metafits)["DATE-OBS"]
n_pix = int(360 / resolution)
az_grid, za_grid, n_total, dOMEGA = makeAZZA_dOMEGA(n_pix, "ZEA")
az_grid = az_grid * 180 / np.pi
za_grid = za_grid * 180 / np.pi
fringe_map = get_image_info(wsclean_psf)
sky_grid = map_sky(
fringe_map["skymap"],
fringe_map["RA"],
fringe_map["dec"],
az_grid,
za_grid,
obstime=obstime,
)
os.system(f"rm -rf {imagename_prefix}*")
os.system("rm -rf {bs_ms}")
return sky_grid
except Exception:
return []
[docs]
def make_primarybeammap(
msname,
metafits,
baselines=[],
freq=0,
obstime="",
resolution=1,
iau_order=True,
MWA_PB_file="",
sweet_spot_file="",
n_threads=1,
calc_fringe_temp=False,
):
"""
Parameters
----------
msname : str
Measurement set
metafits : str
Metafits file
freq : float, optional
Frequency in MHz
obstime : str, optional
Time of the observation in 'yyyy-mm-dd hh:mm:ss' format (If not given automatically obtain from metafits file)
resolution : float, optional
Beam resolution in degree (default : 1deg)
iau_order : bool, optional
Beam in IAU order or not
MWA_PB_file : str, optional
MWA primary beam file path
sweet_spot_file: str, optional
MWA sweet spot file
n_threads : int, optional
Number of cpu threads use for parallel computing
calc_fringe_temp : bool, optional
Calculate temperature contribution of the baseline
Returns
-------
float
Sum of full Beam*Sky (XX)
float
Sum of full Beam (XX)
float
Antenna temperature (XX)
float
Total beam area (XX)
float
Sum of full Beam*Sky (YY)
float
Sum of full Beam (YY)
float
Antenna temperature (YY)
float
Total beam area (YY)
"""
n_threads = max(1, n_threads)
limit_threads(n_threads=n_threads)
warnings.filterwarnings("ignore")
if MWA_PB_file == "" or os.path.exists(MWA_PB_file) is False:
MWA_PB_file = MWA_PB_file_paircars
if sweet_spot_file == "" or os.path.exists(sweet_spot_file) is False:
sweet_spot_file = sweet_spot_file_paircars
beam = mwa_hyperbeam.FEEBeam(MWA_PB_file)
############################
# Creating sky grid
############################
n_pix = int(360 / resolution)
az_grid, za_grid, n_total, dOMEGA = makeAZZA_dOMEGA(n_pix, "ZEA")
az_grid = az_grid * 180 / np.pi
za_grid = za_grid * 180 / np.pi
alt_grid = 90 - (za_grid)
# first go from altitude to zenith angle
theta = (90 - alt_grid) * np.pi / 180
phi = az_grid * np.pi / 180
###############################
# Determining beamformer delays
###############################
metadata = fits.getheader(metafits)
gridpoint = metadata["GRIDNUM"]
sweet_spots = np.load(sweet_spot_file, allow_pickle=True).all()
delay = sweet_spots[int(gridpoint)][-1]
obstime = metadata["DATE-OBS"]
#################################
# Calculating beam array
#################################
jones_array = beam.calc_jones_array(
phi.flatten(),
theta.flatten(),
freq * 10**6,
delay,
[1] * 16,
True,
np.deg2rad(MWALAT),
iau_order,
)
power_beam_array = {}
power_beam_array["XX"] = np.abs(
jones_array[:, 0] * jones_array[:, 0].conjugate()
+ jones_array[:, 1] * jones_array[:, 1].conjugate()
).reshape(az_grid.shape)
power_beam_array["YY"] = np.abs(
jones_array[:, 2] * jones_array[:, 2].conjugate()
+ jones_array[:, 3] * jones_array[:, 3].conjugate()
).reshape(az_grid.shape)
#######################################
# Get Haslam and interpolate onto grid
#######################################
haslam_map = get_haslam(freq)
mask = np.isnan(za_grid)
za_grid[np.isnan(za_grid)] = 90.0 # Replace nans as they break the interpolation
sky_grid = map_sky_haslam(
haslam_map["skymap"],
haslam_map["RA"],
haslam_map["dec"],
az_grid,
za_grid,
obstime=obstime,
)
sky_grid[mask] = np.nan # Remask beyond the horizon
#######################################
# Calculate sky fringe
#######################################
beamsky_sum_XX = 0
beam_sum_XX = 0
Tant_XX = 0
beam_dOMEGA_sum_XX = 0
beamsky_sum_YY = 0
beam_sum_YY = 0
Tant_YY = 0
beam_dOMEGA_sum_YY = 0
pols = ["XX", "YY"]
fringe_list = []
if calc_fringe_temp and len(baselines) > 0:
for bs in baselines:
fringe = get_fringe(
msname,
freq,
metafits,
resolution=resolution,
n_threads=n_threads,
baseline=bs,
)
time.sleep(0.5)
if len(fringe) > 0:
fringe_list.append(fringe)
###############################
# Calculate sky beam
###############################
for pol in pols:
# Get gridded sky
beam = power_beam_array[pol]
beamsky = beam * sky_grid
beam_dOMEGA = beam * dOMEGA
beamsky_sum = np.nansum(beamsky)
beam_sum = np.nansum(beam)
beam_dOMEGA_sum = np.nansum(beam_dOMEGA)
Tant = np.nansum(beamsky) / np.nansum(beam)
if pol == "XX":
beamsky_sum_XX = beamsky_sum
beam_sum_XX = beam_sum
Tant_XX = Tant
beam_dOMEGA_sum_XX = beam_dOMEGA_sum
T_fringe_XX = 0
if len(fringe_list) > 0:
for i in range(len(fringe_list)):
fringe = fringe_list[i]
T_fringe_XX += np.abs(np.nansum(fringe * beamsky) / np.nansum(beam))
if pol == "YY":
beamsky_sum_YY = beamsky_sum
beam_sum_YY = beam_sum
Tant_YY = Tant
beam_dOMEGA_sum_YY = beam_dOMEGA_sum
T_fringe_YY = 0
if len(fringe_list) > 0:
for i in range(len(fringe_list)):
fringe = fringe_list[i]
T_fringe_YY += np.abs(np.nansum(fringe * beamsky) / np.nansum(beam))
return (
beamsky_sum_XX,
beam_sum_XX,
Tant_XX,
beam_dOMEGA_sum_XX,
beamsky_sum_YY,
beam_sum_YY,
Tant_YY,
beam_dOMEGA_sum_YY,
T_fringe_XX,
T_fringe_YY,
)