Source code for paircars.pipeline.single_image_mwapbcor

import os
import time
import traceback
import warnings
import argparse
import sys
import glob
import numpy as np
from numpy.linalg import inv
from astropy.io import fits
from astropy.coordinates import EarthLocation
from paircars.utils.basic_utils import get_datadir
from paircars.utils.mwapb_utils import (
    get_azza_from_fits,
    get_IQUV,
    get_inst_pols,
    B2IQUV,
    get_jones_array,
)
from paircars.utils.selfcal_utils import correct_image_leakage

warnings.filterwarnings("ignore")

datadir = get_datadir()
sweet_spot_file_paircars = f"{datadir}/MWA_sweet_spots.npy"
haslam_map_paircars = f"{datadir}/haslam_map.fits"
MWALON = 116.67
MWALAT = -26.7
MWAALT = 377.8
MWAPOS = EarthLocation.from_geodetic(
    lon="116:40:14.93", lat="-26:42:11.95", height=377.8
)


[docs] def get_pbcor_image( imagename, outfile, metafits, MWA_PB_file="", sweet_spot_file="", leakage_file="", iau_order=False, pb_jones_file="", save_pb=False, interpolated=True, gridpoint=-1, nthreads=1, restore=False, ): """ Correct FITS image for MWA primary beam Parameters ---------- imagename : str Name of the input file outfile : str Basename of the outputfile metafits : str Metafits file path MWA_PB_file : str, optional MWA primary beam file path sweet_spot_file : str, optional MWA sweet spot file leakage_file : str, optional Leakage information file iau_order : bool PB Jones in IAU order or not pb_jones_file : str Numpy file with primary beam jones matrices save_pb : bool Save primary beam jones matrices for future use interpolated : bool Calculate spatially interpolated beams or not gridpoint : int MWA gridpoint number (default : -1, provide if you do not have metafits file) nthreads : int Number of cpu threads use for parallel computing restore : bool Whether correct for MWA primary beam or restore the correction Returns ------- str Output image name """ nthreads = max(1, abs(nthreads)) if MWA_PB_file == "" or os.path.exists(MWA_PB_file) is False: image_header = fits.getheader(imagename) if image_header["CTYPE3"] == "FREQ": freqres = float(image_header["CDELT3"]) / 1000.0 elif image_header["CTYPE4"] == "FREQ": freqres = float(image_header["CDELT4"]) / 1000.0 else: freqres = 160.0 beam_files = glob.glob(f"{datadir}/mwa_full_embedded_element_pattern*.h5") beam_files_freqs = [] for beamfile in beam_files: if os.path.basename(beamfile) == "mwa_full_embedded_element_pattern.h5": beam_file_freq = 1280.0 else: beam_file_freq = float( os.path.basename(beamfile) .split(".h5")[0] .split("mwa_full_embedded_element_pattern_")[-1] ) beam_files_freqs.append(beam_file_freq) beam_files_freqs = np.array(beam_files_freqs) pos = np.argmin(np.abs(beam_files_freqs - freqres)) MWA_PB_file = beam_files[pos] print(f"Primary beam file: {MWA_PB_file}") if sweet_spot_file == "" or os.path.exists(sweet_spot_file) is False: sweet_spot_file = sweet_spot_file_paircars try: if not restore: print( f"Correcting image : {os.path.basename(imagename)} for MWA primary beam response.\n" ) else: print( f"Undo the correction image : {os.path.basename(imagename)} for MWA primary beam response.\n" ) ################################### # Determining beamformer delays ################################### if metafits == "" or os.path.isfile(metafits) is False: if gridpoint == -1: print("Either provide correct metafits file or grid point number.") return else: sweet_spots = np.load(sweet_spot_file, allow_pickle=True).all() sweet_spots[int(gridpoint)][-1] else: metadata = fits.getheader(metafits) gridpoint = metadata["GRIDNUM"] sweet_spots = np.load(sweet_spot_file, allow_pickle=True).all() sweet_spots[int(gridpoint)][-1] ############################### # Reading image data and header ############################### imagename = imagename.rstrip("/") imagedata = fits.getdata(imagename) imageheader = fits.getheader(imagename) ############################## # Determining frequency ############################## if imageheader["CTYPE3"] == "FREQ": freq = imageheader["CRVAL3"] / 10**6 # In MHz elif imageheader["CTYPE4"] == "FREQ": freq = imageheader["CRVAL4"] / 10**6 else: print(f"No frequency axis in image: {imagename}.") return ############################### # Determining stokes ############################## if imageheader["CTYPE3"] == "STOKES": stokesaxis = 3 stokes = "IQUV" elif imageheader["CTYPE4"] == "STOKES": stokesaxis = 4 stokes = "IQUV" else: stokesaxis = 1 stokes = "I" #################################### # Preparing data and data grid #################################### if stokes == "I": imagedata = np.repeat(imagedata, 4, 0) alt_az_array = get_azza_from_fits(imagename, metafits) if os.path.exists(pb_jones_file) is False: jones_array = get_jones_array( 90 - np.rad2deg(alt_az_array["za_rad"].flatten()), np.rad2deg(alt_az_array["astro_az_rad"].flatten()), freq, gridpoint, ncpu=nthreads, interpolated=interpolated, MWA_PB_file=MWA_PB_file, sweet_spot_file=sweet_spot_file, iau_order=iau_order, ) if save_pb: if pb_jones_file == "": pb_jones_file = "mwapb.npy" else: pb_jones_file = pb_jones_file.rstrip(".npy") + ".npy" print(f"Saving primary beam Jones matrices in: {pb_jones_file}\n") np.save( pb_jones_file, np.array([iau_order, jones_array], dtype="object") ) elif os.path.exists(pb_jones_file): print(f"Loading primary beam Jones matrices from : {pb_jones_file}\n") pb = np.load(pb_jones_file, allow_pickle=True) pb_jones_order = pb[0] jones_array = pb[1] expected_shape = (alt_az_array["astro_az_rad"].flatten().shape[0], 2, 2) if jones_array.shape != expected_shape or pb_jones_order != iau_order: if pb_jones_order != iau_order: print( "Given primary beam convention does not match with intented convention. Re-estimating primary beam Jones\n" ) else: print( "Loaded primary beam Jones are of different shape. Re-estimating primary beam Jones.\n" ) jones_array = get_jones_array( 90 - np.rad2deg(alt_az_array["za_rad"].flatten()), np.rad2deg(alt_az_array["astro_az_rad"].flatten()), freq, gridpoint, ncpu=nthreads, interpolated=interpolated, MWA_PB_file=MWA_PB_file, sweet_spot_file=sweet_spot_file, iau_order=iau_order, ) if restore is False: stokes = get_IQUV(imagename, stokesaxis=stokesaxis) Vij = get_inst_pols(stokes, iau_order=not iau_order) Vij_reshaped = Vij.reshape(Vij.shape[0] * Vij.shape[1], 2, 2) jones_array_H = np.transpose(jones_array.conj(), axes=((0, 2, 1))) Vij_corrected = np.matmul( inv(jones_array), np.matmul(Vij_reshaped, inv(jones_array_H)) ) Vij_reshaped = Vij_corrected.reshape(Vij.shape) B = B2IQUV(Vij_reshaped, iau_order=iau_order) else: stokes = get_IQUV(imagename, stokesaxis=stokesaxis) Vij = get_inst_pols(stokes, iau_order=iau_order) Vij_reshaped = Vij.reshape(Vij.shape[0] * Vij.shape[1], 2, 2) jones_array_H = np.transpose(jones_array.conj(), axes=((0, 2, 1))) Vij_corrected = np.matmul( jones_array, np.matmul(Vij_reshaped, jones_array_H) ) Vij_reshaped = Vij_corrected.reshape(Vij.shape) B = B2IQUV(Vij_reshaped, iau_order=not iau_order) if stokesaxis == 3: imagedata[0, 0, :, :] = np.real(B["I"]) imagedata[0, 1, :, :] = np.real(B["Q"]) imagedata[0, 2, :, :] = np.real(B["U"]) imagedata[0, 3, :, :] = np.real(B["V"]) fullpol = True elif stokesaxis == 4: imagedata[0, 0, :, :] = np.real(B["I"]) imagedata[1, 0, :, :] = np.real(B["Q"]) imagedata[2, 0, :, :] = np.real(B["U"]) imagedata[3, 0, :, :] = np.real(B["V"]) fullpol = True else: imagedata[0, 0, :, :] = np.real(B["I"]) fullpol = False if os.path.exists(outfile): os.system(f"rm -rf {outfile}") fits.writeto(outfile, data=imagedata, header=imageheader, overwrite=True) if fullpol: if leakage_file != "" and os.path.exists(leakage_file): try: ( q_leakage, u_leakage, v_leakage, res_q_leakage, res_u_leakage, res_v_leakage, ) = np.load(leakage_file, allow_pickle=True) ######################################################################################## # Image based leakage correction if polarisation selfcal solutions could not be applied, but leakage information available ######################################################################################## header = fits.getheader(outfile) if "POLSELF" in header.keys(): if header["POLSELF"] == "FALSE": leakagecor_image, _ = correct_image_leakage( outfile, modelname="", q_leakage=q_leakage, u_leakage=u_leakage, v_leakage=v_leakage, ) if os.path.exists(leakagecor_image): os.system(f"rm -rf {outfile}") os.system(f"mv {leakagecor_image} {outfile}") with fits.open(outfile, mode="update") as hdul: hdr = hdul[0].header hdr["IMGLEAKCOR"] = "TRUE" ##################################################### # Writing leakage information ###################################################### with fits.open(outfile, mode="update") as hdul: hdr = hdul[0].header hdr["LEAKUNIT"] = "PERCENT" if np.isnan(res_q_leakage): hdr["QLEAK"] = "NAN" else: hdr["QLEAK"] = abs(round(res_q_leakage * 100.0, 4)) if np.isnan(res_u_leakage): hdr["ULEAK"] = "NAN" else: hdr["ULEAK"] = abs(round(res_u_leakage * 100.0, 4)) if np.isnan(res_v_leakage): hdr["VLEAK"] = "NAN" else: hdr["VLEAK"] = abs(round(res_v_leakage * 100.0, 4)) except Exception: traceback.print_exc() print(f"Output image written to : {outfile}\n") return outfile except Exception: traceback.print_exc() return
[docs] def cli(): parser = argparse.ArgumentParser( description="Correct images for MWA primary beam response" ) # Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument( "imagename", type=str, help="Name of the image file", ) basic_args.add_argument( "metafits", type=str, help="Name of the metafits file", ) basic_args.add_argument( "outfile", type=str, help="Output file name", ) # Advanced parameters adv_args = parser.add_argument_group( "###################\nAdvanced parameters\n###################" ) adv_args.add_argument( "--MWA_PB_file", type=str, default="", help="MWA primary beam file", ) adv_args.add_argument( "--sweetspot_file", type=str, default="", help="MWA primary beam sweetspot file path", ) adv_args.add_argument( "--leakage_file", type=str, default="", help="Leakage information file", ) adv_args.add_argument( "--iau_order", action="store_true", help="PB Jones in IAU order", ) adv_args.add_argument( "--gridpoint", type=int, default=-1, help="MWA sweet spot pointing number", ) adv_args.add_argument( "--restore", action="store_true", help="Restore the primary beam correction", ) adv_args.add_argument( "--pb_jones_file", default="mwapb.npy", help="NumPy file of PB Jones matrices", ) adv_args.add_argument( "--save_pb", action="store_true", help="Save PB Jones matrices", ) adv_args.add_argument( "--interpolated", action="store_true", help="Use interpolated beam model", ) # Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--num_threads", type=int, default=1, help="Number of CPU threads to use", ) if len(sys.argv) == 1: parser.print_help(sys.stderr) return 1 args = parser.parse_args() time.time() try: get_pbcor_image( args.imagename, args.outfile, args.metafits, MWA_PB_file=args.MWA_PB_file, sweet_spot_file=args.sweetspot_file, leakage_file=args.leakage_file, iau_order=args.iau_order, pb_jones_file=args.pb_jones_file, save_pb=args.save_pb, interpolated=args.interpolated, gridpoint=args.gridpoint, nthreads=args.num_threads, restore=args.restore, ) return 0 except Exception: traceback.print_exc() return 1