import logging
import numpy as np
import argparse
import time
import sys
import os
from casatools import msmetadata
from dask import delayed
from functools import partial
from astropy.io import fits
from paircars.utils.basic_utils import (
suppress_output,
weighted_mean,
print_banner,
)
from paircars.utils.calibration import (
get_caltable_metadata,
get_quartical_table_metadata,
)
from paircars.utils.flagging import (
get_unflagged_antennas,
get_chans_flag,
do_flag_backup,
)
from paircars.utils.imaging import (
calc_field_of_view,
calc_cellsize,
get_fft_size,
)
from paircars.utils.logger_utils import (
SmartDefaultsHelpFormatter,
clean_shutdown,
create_logger,
init_logger,
get_logger_safe,
)
from paircars.utils.ms_metadata import (
check_datacolumn_valid,
)
from paircars.utils.mwa_utils import (
freq_to_MWA_coarse,
get_MWA_OBSID,
get_MWA_coarse_chan,
)
from paircars.utils.proc_manage_utils import (
scale_worker_and_wait,
get_local_dask_cluster,
)
from paircars.utils.resource_utils import drop_cache, limit_threads
from paircars.utils.selfcal_utils import (
quiet_sun_selfcal,
selfcal_round,
flag_non_disk,
do_uvsub_flag,
)
from paircars.utils.udocker_utils import (
check_udocker_container,
initialize_wsclean_container,
initialize_quartical_container,
)
logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("tornado.application").setLevel(logging.CRITICAL)
[docs]
def do_selfcal(
msname="",
workdir="",
selfcaldir="",
metafits="",
cal_applied=True,
refant="",
start_threshold=5,
end_threshold=3,
max_iter=30,
max_DR=100000,
min_iter=3,
DR_convergence_frac=0.1,
uvrange="",
minuv=0,
solint="60s",
weight="briggs",
robust=0.0,
do_apcal=True,
min_tol_factor=1.0,
applymode="calonly",
solar_selfcal=True,
use_solarflagger=False,
ncpu=1,
mem=1,
logfile="intselfcal.log",
):
"""
Do selfcal iterations and use convergence rules to stop
Parameters
----------
msname : str
Name of the measurement set
workdir : str
Work directory
selfcaldir : str
Working directory
metafits : str
Metafits file
cal_applied : bool, optional
Basic calibration applied or not
refant : str, optional
Reference antenna
start_threshold : int, optional
Start CLEAN threhold
end_threshold : int, optional
End CLEAN threshold
max_iter : int, optional
Maximum numbers of selfcal iterations (In each selfcal mode)
max_DR : float, optional
Maximum dynamic range
min_iter : int, optional
Minimum numbers of seflcal iterations at different stages
DR_convergence_frac : float, optional
Dynamic range fractional change to consider as converged
uvrange : str, optional
UV-range for calibration
minuv : float, optionial
Minimum UV-lambda to use in imaging
solint : str, optional
Solutions interval
weight : str, optional
Imaging weighting
robust : float, optional
Briggs weighting robust parameter (-1 to 1)
do_apcal : bool, optional
Perform ap-selfcal or not
min_tol_factor : float, optional
Minimum tolerable variation in temporal direction in percentage
applymode : str, optional
Solution apply mode
solar_selfcal : bool, optional
Whether is is solar selfcal or not
use_solarflagger : bool, optional
Use solar flagging or not
ncpu : int, optional
Number of CPU threads to use
mem : float, optional
Memory in GB to use
logfile : str, optional
Log file name
Returns
-------
int
Success message
str
Self-calibrated measurement set
str
Final caltable
bool
Whether non-disk data chunks flagging was successful or not
float
Final dynamic range
"""
ncpu = max(1, ncpu)
mem = abs(mem)
limit_threads(n_threads=ncpu)
from casatasks import split, flagmanager
sub_observer = None
intlogger, logfile = create_logger(
os.path.basename(logfile).split(".log")[0],
logfile,
)
if os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/.jobname_password.npy", allow_pickle=True
)
if os.path.exists(logfile):
sub_observer = init_logger(
"remotelogger_intselfcal_{os.path.basename(msname).split('.ms')[0]}",
logfile,
jobname=jobname,
password=password,
)
try:
msname = os.path.abspath(msname.rstrip("/"))
selfcaldir = selfcaldir.rstrip("/")
if os.path.exists(selfcaldir):
intlogger.info(
f"Removing pre-existing intensity selfcal directory: {selfcaldir}"
)
os.system(f"rm -rf {selfcaldir}")
os.makedirs(selfcaldir, exist_ok=True)
os.chdir(selfcaldir)
selfcalms = selfcaldir + "/intselfcal_" + os.path.basename(msname)
if os.path.exists(selfcalms):
os.system("rm -rf " + selfcalms)
if os.path.exists(selfcalms + ".flagversions"):
os.system("rm -rf " + selfcalms + ".flagversions")
##############################
# Restoring any previous flags
##############################
with suppress_output():
flags = flagmanager(vis=msname, mode="list")
keys = flags.keys()
for k in keys:
if k == "MS":
pass
else:
version = flags[0]["name"]
try:
with suppress_output():
flagmanager(vis=msname, mode="restore", versionname=version)
flagmanager(vis=msname, mode="delete", versionname=version)
except BaseException:
pass
if os.path.exists(msname + ".flagversions"):
os.system("rm -rf " + msname + ".flagversions")
#########################################
# Determining calibration applied or not
#########################################
fluxscale_mwa = False
solar_attn = 1
if cal_applied and os.path.exists(f"{msname}/.applied_sol") is False:
cal_applied = False
if cal_applied is False:
fluxscale_mwa = True
if os.path.exists(metafits) is False:
intlogger.error(
"Calibration solutions were not applied and target metafits is also not supplied. Provide any one of them."
)
return 1, msname, [], False, 0
solar_attn = float(fits.getheader(metafits)["ATTEN_DB"])
applymode = "calflag"
##############################
# Spliting corrected data
##############################
hascor = check_datacolumn_valid(msname, datacolumn="CORRECTED_DATA")
msmd = msmetadata()
msmd.open(msname)
scan = int(msmd.scannumbers()[0])
field = int(msmd.fieldsforscan(scan)[0])
msmd.close()
if hascor:
intlogger.info(f"Spliting corrected data to ms : {selfcalms}")
with suppress_output():
split(
vis=msname,
field=str(field),
scan=str(scan),
outputvis=selfcalms,
datacolumn="corrected",
)
else:
intlogger.info(f"Spliting data to ms : {selfcalms}")
with suppress_output():
split(
vis=msname,
field=str(field),
scan=str(scan),
outputvis=selfcalms,
datacolumn="data",
)
msname = selfcalms
################################################################
# Initial flagging -- zeros, extreme bad data, and non-disk data
################################################################
intlogger.info("Checking initial flagging...")
unflag_chans, flag_chans = get_chans_flag(msname)
if len(unflag_chans) > 0:
temp_ms = f"{msname}.tempsplit"
unflag_chans = [f"{i}" for i in unflag_chans]
unflag_spw = f"0:{';'.join(unflag_chans)}"
intlogger.info(f"Spliting only unflagged spectral window: {unflag_spw}")
split(vis=msname, outputvis=temp_ms, datacolumn="all", spw=unflag_spw)
os.system(f"rm -rf {msname} {msname}.flagversions")
os.system(f"mv {temp_ms} {msname}")
############################################
# Imaging and calibration parameters
############################################
intlogger.info("Estimating imaging Parameters.")
cellsize = calc_cellsize(msname, 3)
instrument_fov = calc_field_of_view(msname, FWHM=False)
cutout_rsun_arcsec = 10 * 16 * 60 # 10 solar radii
fov = min(instrument_fov, 2 * cutout_rsun_arcsec)
imsize = int(fov / cellsize)
imsize = get_fft_size(imsize)
if refant == "":
unflagged_antenna_names, flag_frac_list = get_unflagged_antennas(msname)
msmd = msmetadata()
msmd.open(msname)
refant_ids = sorted(
[msmd.antennaids(antname)[0] for antname in unflagged_antenna_names]
)[0]
refant = str(refant_ids)
msmd.close()
############################################
# Initiating selfcal Parameters
############################################
intlogger.info("Estimating self-calibration parameters.")
DR1 = 0.0
DR2 = 0.0
DR3 = 0.0
RMS1 = -1.0
RMS2 = -1.0
RMS3 = -1.0
num_iter = 0
num_iter_after_ap = 0
num_iter_fixed_sigma = 0
num_iter_after_flag = 0
last_sigma_DR1 = 0
sigma_reduced_count = 0
calmode = "p"
threshold = start_threshold
last_round_gaintable = []
last_round_ms = ""
use_previous_model = False
nondisk_flag = True
min_DR = 0
issue_occured = False
min_iter = max(3, min_iter) # Minimum 3 iterations
do_flag = False
restore_flag = True
os.system("rm -rf *_selfcal_present*")
###########################################
# Starting using Gaussian model
###########################################
intlogger.info("Starting self-calibration using Gaussian source model.")
msg, _ = quiet_sun_selfcal(
msname, intlogger, selfcaldir, refant=str(refant), solint=solint
)
if msg == 0:
intlogger.info(
"Starting self-calibration using Gaussian model is successful."
)
else:
if msg == 2:
nondisk_flag = False
intlogger.warning(
"Starting self-calibration using Gaussian model is not successful."
)
##########################################
# Starting selfcal loops
##########################################
while True:
##################################
# Selfcal round parameters
##################################
issue_occured = False # Resetting it in every round
intlogger.info("######################################")
intlogger.info(
"Selfcal iteration : "
+ str(num_iter)
+ ", Threshold: "
+ str(threshold)
+ ", Calibration mode: "
+ str(calmode)
)
intlogger.info("######################################")
if num_iter_after_flag > 0 and do_flag:
do_flag = False
if (
DR3 < 0.85 * DR2 and DR3 < 0.9 * DR2
): # If DR is decreasing, restore flags
restore_flag = True
min_iter += 1
intlogger.warning(
"DR is decreaging after flagging. Restoring previous uv-domain flags."
)
else:
restore_flag = False
(
msg,
gaintable,
dyn,
rms,
final_image,
final_model,
final_residual,
_,
max_pixel_offset,
) = selfcal_round(
msname,
metafits,
intlogger,
selfcaldir,
cellsize,
imsize,
round_number=num_iter,
uvrange=uvrange,
minuv=minuv,
calmode=calmode,
solint=solint,
refant=str(refant),
applymode=applymode,
min_tol_factor=min_tol_factor,
threshold=threshold,
use_previous_model=use_previous_model,
weight=weight,
robust=robust,
use_solar_mask=solar_selfcal,
fluxscale_mwa=fluxscale_mwa,
do_intensity_cal=True,
do_polcal=False,
solar_attn=solar_attn,
do_flag=do_flag,
restore_flag=restore_flag,
ncpu=ncpu,
mem=round(mem, 2),
)
if msg == 1:
if num_iter == 0:
intlogger.warning(
"No model flux is picked up in first round. Trying with lowest threshold.\n"
)
(
msg,
gaintable,
dyn,
rms,
final_image,
final_model,
final_residual,
_,
max_pixel_offset,
) = selfcal_round(
msname,
metafits,
intlogger,
selfcaldir,
cellsize,
imsize,
round_number=num_iter,
uvrange=uvrange,
minuv=minuv,
calmode=calmode,
solint=solint,
refant=str(refant),
applymode=applymode,
min_tol_factor=min_tol_factor,
threshold=end_threshold,
use_previous_model=False,
weight=weight,
robust=robust,
use_solar_mask=solar_selfcal,
fluxscale_mwa=fluxscale_mwa,
do_intensity_cal=True,
do_polcal=False,
solar_attn=solar_attn,
do_flag=False,
restore_flag=True,
ncpu=ncpu,
mem=round(mem, 2),
)
if msg == 1:
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return msg, msname, [], nondisk_flag, 0
else:
threshold = end_threshold
else:
os.system("rm -rf *_selfcal_present*")
return msg, msname, [], nondisk_flag, 0
elif msg > 1:
intlogger.error("Self-calibration failed.")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return msg, msname, [], nondisk_flag, 0
if num_iter == 0:
DR1 = DR3 = DR2 = dyn
RMS1 = RMS3 = RMS2 = rms
elif num_iter == 1:
DR3 = dyn
RMS3 = rms
min_DR = dyn
else:
DR1 = DR2
DR2 = DR3
DR3 = dyn
RMS1 = RMS2
RMS2 = RMS3
RMS3 = rms
intlogger.info("######################################")
intlogger.info(
"RMS based dynamic ranges: "
+ str(DR1)
+ ","
+ str(DR2)
+ ","
+ str(DR3)
)
intlogger.info(
"RMS of the images: " + str(RMS1) + "," + str(RMS2) + "," + str(RMS3)
)
intlogger.info(f"Maximum pixel offset: {max_pixel_offset}")
if DR3 > 1.1 * DR2 and (
calmode == "p" or (calmode == "ap" and num_iter_after_ap > 1)
):
use_previous_model = True
else:
use_previous_model = False
#################################
# Checking DR decrease conditions
#################################
######################################################################
# Condition 1: If DR is decreasing (DR decrease in phase-only selfcal)
# Condition 2: If DR suddenly decreased or decreased below starting DR after apcal
# Condition 3: If DR is decreasing, DR decrease in amplitude-phase selfcal
######################################################################
cond1 = (
(DR3 < 0.85 * DR2 and DR3 < 0.9 * DR1 and DR2 > DR1)
and calmode == "p"
and num_iter > min_iter
)
cond2 = (
(DR3 < 0.7 * DR2 or DR3 < 0.9 * min_DR)
and calmode == "ap"
and num_iter_after_ap > 1
)
cond3 = (
DR3 < 0.9 * DR2
and DR2 > 1.1 * DR1
and calmode == "ap"
and num_iter_after_ap > min_iter
)
if cond1 or cond2 or cond3:
issue_occured = True
################################
# Replacing with previous ms
#################################
if os.path.exists(last_round_ms):
os.system(f"rm -rf {msname}")
os.system(f"cp -r {last_round_ms} {msname}")
#############################################
# Printing condition message
##############################################
if cond1:
intlogger.warning(
"Dynamic range decreasing in phase-only self-cal."
)
if cond2:
intlogger.warning(
"Dynamic range dropped suddenly or drop below starting dynamic range."
)
if cond3:
intlogger.warning(
"Dynamic range is decreasing after minimum numbers of 'ap' round.\n"
)
##################################################
# Performing steps
##################################################
if do_apcal and calmode == "p":
intlogger.info("Changed calmode to 'ap'.")
calmode = "ap"
use_previous_model = False
elif calmode == "ap" and threshold > end_threshold:
threshold -= 1
intlogger.info(f"Reducing threshold to: {threshold}")
else:
if not use_solarflagger and DR3 < 100:
use_solarflagger = True
if use_solarflagger and not do_flag and num_iter_after_flag == 0:
intlogger.info("Trying uvsub flagging.")
do_flag = True
num_iter_after_flag += 1
use_previous_model = False
else:
intlogger.warning(
"Stopping self-calibration. Using last round caltable as final.\n"
)
if not use_solarflagger and DR3 < 100:
intlogger.info(
"Performing final flagging because DR is less than 100."
)
do_uvsub_flag(msname, threshold_list=[10, 7, 5], ncpu=ncpu)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, last_round_gaintable, nondisk_flag, DR2
###########################
# If maximum DR has reached
###########################
if DR3 > max_DR and num_iter_after_ap > min_iter:
intlogger.info("Maximum dynamic range is reached.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, nondisk_flag, DR3
###########################
# Checking DR convergence
###########################
# Condition 1
# (If DR did not increase after one round of sigma reduction, do not reduce sigma further and exit)
###########################
if (
((do_apcal and calmode == "ap") or not do_apcal)
and num_iter_fixed_sigma > min_iter
and (
last_sigma_DR1 > 0
and abs(round(np.nanmedian([DR1, DR2, DR3]), 0) - last_sigma_DR1)
/ last_sigma_DR1
< DR_convergence_frac
)
and sigma_reduced_count > 1
):
if threshold > end_threshold:
intlogger.info(
"DR does not increase over last two changes in threshold, but minimum threshold has not reached yet.\n"
)
intlogger.info(
"Starting final self-calibration rounds with threshold = "
+ str(end_threshold)
+ "sigma...\n"
)
threshold = end_threshold
sigma_reduced_count += 1
num_iter_fixed_sigma = 0
else:
if not use_solarflagger and DR3 < 100:
intlogger.info(
"Performing final flagging because DR is less than 100."
)
do_uvsub_flag(msname, threshold_list=[10, 7, 5], ncpu=ncpu)
intlogger.info("Selfcal calibration has converged.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, nondisk_flag, DR3
else:
################################################################
# Condition 2
# If DR does not increase a certain percentage
# If threshold not reached to end threshold, reducing threshold
################################################################
if (
abs(DR1 - DR2) / DR2 < DR_convergence_frac
and num_iter > min_iter
and num_iter_fixed_sigma > min_iter
and threshold > end_threshold
):
#####################################
# Change from phase only selfcal to amplitude-phase selfcal
#####################################
if do_apcal and calmode == "p":
intlogger.info(
"Dynamic range converged. Changing calmode to 'ap'.\n"
)
calmode = "ap"
use_previous_model = False
######################################
# Reducing threshold if already in apcal
######################################
elif (do_apcal and num_iter_after_ap > min_iter) or not do_apcal:
threshold -= 1
intlogger.info("Reducing threshold to : " + str(threshold))
sigma_reduced_count += 1
num_iter_fixed_sigma = 0
if last_sigma_DR1 > 0:
last_sigma_DR1 = round(np.nanmean([DR1, DR2, DR3]), 0)
else:
last_sigma_DR1 = round(np.nanmean([DR1, DR2, DR3]), 0)
######################################
# Condition 3
# If threshold reached, converged
######################################
elif (
abs(DR1 - DR2) / DR2 < DR_convergence_frac
and num_iter > min_iter
and num_iter_fixed_sigma > min_iter
and threshold == end_threshold
):
if not use_solarflagger and DR3 < 100:
intlogger.info(
"Performing final flagging because DR is less than 100."
)
do_uvsub_flag(msname, threshold_list=[10, 7, 5], ncpu=ncpu)
intlogger.info("Self-calibration has converged.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, nondisk_flag, DR3
#########################################
# In apcal and maximum iteration has reached
#########################################
elif num_iter > min_iter and (
(not do_apcal and num_iter == max_iter)
or (do_apcal and calmode == "ap" and num_iter_after_ap == max_iter)
):
if not use_solarflagger and DR3 < 100:
intlogger.info(
"Performing final flagging because DR is less than 100."
)
do_uvsub_flag(msname, threshold_list=[10, 7, 5], ncpu=ncpu)
intlogger.info(
"Self-calibration is finished. Maximum iteration is reached.\n"
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, nondisk_flag, DR3
num_iter += 1
os.system(f"cp -r {msname} {msname}.round{num_iter}")
if calmode == "ap":
num_iter_after_ap += 1
if num_iter_after_ap > 0:
if not use_solarflagger and DR3 < 100:
use_solarflagger = True
if (
use_solarflagger
and not do_flag
and num_iter_after_flag == 0
and num_iter_after_ap == 0
):
intlogger.info("Trying uvsub flagging.")
do_flag = True
num_iter_after_flag += 1
num_iter_fixed_sigma += 1
if not issue_occured:
last_round_gaintable = gaintable
last_round_ms = f"{msname}.lastround"
if os.path.exists(last_round_ms):
os.system(f"rm -rf {last_round_ms}")
os.system(f"cp -r {msname} {last_round_ms}")
except Exception:
intlogger.exception(
"Exception occured in intensity self-calibration", exc_info=True
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 1, msname, [], False, 0
[docs]
def do_polselfcal(
msname="",
workdir="",
selfcaldir="",
metafits="",
refant="",
max_iter=10,
max_DR=100000,
min_iter=5,
threshold=3.0,
DR_convergence_frac=0.1,
uvrange="",
minuv=0,
weight="briggs",
robust=0.0,
solar_selfcal=True,
use_solarflagger=False,
try_nondisk_flag=True,
ncpu=1,
mem=1,
logfile="polselfcal.log",
):
"""
Do selfcal iterations and use convergence rules to stop
Parameters
----------
msname : str
Name of the measurement set
workdir : str
Work directory
selfcaldir : str
Working directory
metafits : str
Metafits file
refant : str, optional
Reference antenna
max_iter : int, optional
Maximum numbers of selfcal iterations
max_DR : float, optional
Maximum dynamic range
min_iter : int, optional
Minimum numbers of seflcal iterations at different stages
threshold: float, optional
Threshold of CLEANing
DR_convergence_frac : float, optional
Dynamic range fractional change to consider as converged
uvrange : str, optional
UV-range for calibration
minuv : float, optionial
Minimum UV-lambda to use in imaging
weight : str, optional
Imaging weighting
robust : float, optional
Briggs weighting robust parameter (-1 to 1)
solar_selfcal : bool, optional
Whether is is solar selfcal or not
use_solarflagger : bool, optional
Use solar flagger or not
try_nondisk_flag : bool, optional
Try to flag non-disk data chunks or not
ncpu : int, optional
Number of CPU threads to use
mem : float, optional
Memory in GB to use
logfile : str, optional
Log file name
Returns
-------
int
Success message
str
Polarisation self-calibrated measurement set
str
Final caltable
str
Leakage file
float
Final image dynamic range
"""
ncpu = max(1, ncpu)
mem = abs(mem)
limit_threads(n_threads=ncpu)
from casatasks import split, flagdata, flagmanager
sub_observer = None
pollogger, logfile = create_logger(
os.path.basename(logfile).split(".log")[0],
logfile,
)
if os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None:
time.sleep(5)
jobname, password = np.load(
f"{workdir}/.jobname_password.npy", allow_pickle=True
)
if os.path.exists(logfile):
sub_observer = init_logger(
"remotelogger_polselfcal_{os.path.basename(msname).split('.ms')[0]}",
logfile,
jobname=jobname,
password=password,
)
try:
msname = os.path.abspath(msname.rstrip("/"))
selfcaldir = selfcaldir.rstrip("/")
if os.path.exists(selfcaldir):
pollogger.info(
f"Removing pre-existing polarisation selfcal directory: {selfcaldir}"
)
os.system(f"rm -rf {selfcaldir}")
os.makedirs(selfcaldir, exist_ok=True)
os.chdir(selfcaldir)
selfcalms = selfcaldir + "/polselfcal_" + os.path.basename(msname)
if os.path.exists(selfcalms):
os.system(f"rm -rf {selfcalms}")
if os.path.exists(f"{selfcalms}.flagversions"):
os.system(f"rm -rf {selfcalms}.flagversions")
################################
# Trying to flag non-disk chunks
################################
split_spw = ""
if try_nondisk_flag:
pollogger.info(
"Non-disk data chunk flagging was successful during intensity self-calibration. Trying before polarisation self-calibration."
)
do_flag_backup(msname, flagtype="nondisk")
result = flag_non_disk(msname)
if result == 0:
pollogger.info("Flagged non-disk data chunks.")
unflag_chans, flag_chans = get_chans_flag(msname)
split_spw = "0:" + ";".join([str(i) for i in unflag_chans])
else:
pollogger.warning("Could not flag non-disk data chunks.")
split_spw = ""
with suppress_output():
if result != 0:
flagmanager(vis=msname, mode="restore", versionname="nondisk_1")
flagmanager(vis=msname, mode="delete", versionname="nondisk_1")
##############################
# Spliting corrected data
##############################
hascor = check_datacolumn_valid(msname, datacolumn="CORRECTED_DATA")
msmd = msmetadata()
msmd.open(msname)
scan = int(msmd.scannumbers()[0])
field = int(msmd.fieldsforscan(scan)[0])
msmd.close()
if hascor:
pollogger.info(f"Spliting corrected data to ms : {selfcalms}")
with suppress_output():
split(
vis=msname,
spw=split_spw,
field=str(field),
scan=str(scan),
outputvis=selfcalms,
datacolumn="corrected",
)
else:
pollogger.warning("Corrected data column is not present.")
pollogger.info(f"Spliting data to ms : {selfcalms}")
with suppress_output():
split(
vis=msname,
spw=split_spw,
field=str(field),
scan=str(scan),
outputvis=selfcalms,
datacolumn="data",
)
msname = selfcalms
################################################################
# Initial flagging -- zeros, extreme bad data
################################################################
pollogger.info("Checking initial flagging.")
with suppress_output():
flagdata(
vis=msname,
mode="clip",
clipzeros=True,
datacolumn="data",
flagbackup=False,
)
unflag_chans, flag_chans = get_chans_flag(msname)
if len(unflag_chans) > 0:
temp_ms = f"{msname}.tempsplit"
unflag_chans = [f"{i}" for i in unflag_chans]
unflag_spw = f"0:{';'.join(unflag_chans)}"
pollogger.info(f"Spliting only unflagged spectral window: {unflag_spw}")
split(vis=msname, outputvis=temp_ms, datacolumn="all", spw=unflag_spw)
os.system(f"rm -rf {msname} {msname}.flagversions")
os.system(f"mv {temp_ms} {msname}")
############################################
# Imaging and calibration parameters
############################################
pollogger.info("Estimating imaging Parameters.")
cellsize = calc_cellsize(msname, 3)
instrument_fov = calc_field_of_view(msname, FWHM=False)
cutout_rsun_arcsec = 10 * 16 * 60 # 10 solar radii
fov = min(instrument_fov, 2 * cutout_rsun_arcsec)
imsize = int(fov / cellsize)
imsize = get_fft_size(imsize)
if refant == "":
unflagged_antenna_names, flag_frac_list = get_unflagged_antennas(msname)
msmd = msmetadata()
msmd.open(msname)
refant_ids = sorted(
[msmd.antennaids(antname)[0] for antname in unflagged_antenna_names]
)[0]
refant = str(refant_ids)
msmd.close()
############################################
# Initiating selfcal Parameters
############################################
pollogger.info("Estimating self-calibration parameters.")
DR1 = 0.0
DR2 = 0.0
DR3 = 0.0
RMS1 = -1.0
RMS2 = -1.0
RMS3 = -1.0
QL1 = QL2 = QL3 = 1.0
UL1 = UL2 = UL3 = 1.0
VL1 = VL2 = VL3 = 1.0
num_iter = 0
calc_chunks = True
do_flag = False
restore_flag = True
num_iter_after_flag = 0
last_round_gaintable = []
last_leakage_file = ""
last_round_ms = ""
solve_array_leakage = True
issue_occured = False
num_iter_after_reset = 0
min_iter = max(5, min_iter) # Minimum 5 iterations
os.system("rm -rf *_selfcal_present*")
##########################################
# Starting selfcal loops
##########################################
while True:
issue_occured = False # Reseting in every round
##################################
# Selfcal round parameters
##################################
pollogger.info("######################################")
pollogger.info("Selfcal iteration : " + str(num_iter))
pollogger.info("######################################")
if num_iter == 0:
pbcor = True
leakagecor = True
pbuncor = False
elif num_iter < 3:
pbcor = False
leakagecor = True
pbuncor = False
elif num_iter == 3:
pbcor = False
leakagecor = True
pbuncor = True
else:
if num_iter == min_iter:
solve_array_leakage = False # This is to make sure if it failed, last round ms has same state of polcal
pbcor = True
leakagecor = True
pbuncor = True
if num_iter_after_flag > 0 and do_flag:
do_flag = False
if (
DR3 < 0.9 * DR2
): # If DR after flagging is smaller than 90% of last round DR, restore flags
pollogger.warning(
"Restoring previous uv-bin flags because dynamic range did not improve."
)
restore_flag = True
min_iter += 1
else:
restore_flag = False
(
msg,
gaintable,
dyn,
rms,
final_image,
final_model,
final_residual,
leakage_info,
max_pixel_offset,
) = selfcal_round(
msname,
metafits,
pollogger,
selfcaldir,
cellsize,
imsize,
round_number=num_iter,
uvrange=uvrange,
minuv=minuv,
calc_chunks=calc_chunks,
refant=str(refant),
threshold=threshold,
weight=weight,
robust=robust,
use_solar_mask=solar_selfcal,
do_polcal=True,
do_intensity_cal=False,
pbcor=pbcor,
leakagecor=leakagecor,
pbuncor=pbuncor,
do_flag=do_flag,
restore_flag=restore_flag,
solve_array_leakage=solve_array_leakage,
ncpu=ncpu,
mem=round(mem, 2),
)
if msg == 1:
pollogger.error("No model flux is picked up.")
os.system("rm -rf *_selfcal_present*")
return msg, msname, [], "", 0
elif msg > 2:
pollogger.error("Polarisation self-calibration failed.")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return msg, msname, [], "", 0
elif msg == 2:
if calc_chunks is False:
if num_iter > min_iter:
pollogger.warning(
"Minor issues in polarisation self-calibration model prediction. Stopped at previous round."
)
if os.path.exists(last_round_ms):
os.system(f"rm -rf {msname}")
os.system(f"cp -r {last_round_ms} {msname}")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, last_round_gaintable, last_leakage_file, DR2
else:
issue_occured = True
pollogger.error(
"Minor issues in polarisation self-calibration model prediction. Minimum iteration has not covered."
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return msg, msname, [], "", 0
else:
issue_occured = True
pollogger.warning(
"Minor issues in polarisation self-calibration model prediction. Retrying with entire spectro-temporal chunks."
)
calc_chunks = False
else:
leakage_info = np.array(leakage_info)
Q = leakage_info[:, 0]
U = leakage_info[:, 1]
V = leakage_info[:, 2]
Qe = leakage_info[:, 3]
Ue = leakage_info[:, 4]
Ve = leakage_info[:, 5]
q_leakage, q_err = weighted_mean(Q, Qe)
u_leakage, u_err = weighted_mean(U, Ue)
v_leakage, v_err = weighted_mean(V, Ve)
leakage_file = f"{gaintable[0].split('.dcal')[0]}.leakage.npy"
np.save(
leakage_file, [q_leakage, u_leakage, v_leakage, q_err, u_err, v_err]
)
if num_iter == 0:
DR1 = DR3 = DR2 = dyn
RMS1 = RMS2 = RMS3 = rms
QL1 = QL2 = QL3 = q_leakage
UL1 = UL2 = UL3 = u_leakage
VL1 = VL2 = VL3 = v_leakage
min_DR = dyn
elif num_iter == 1:
DR3 = dyn
RMS3 = rms
QL3 = q_leakage
UL3 = u_leakage
VL3 = v_leakage
else:
DR1 = DR2
DR2 = DR3
DR3 = dyn
RMS1 = RMS2
RMS2 = RMS3
RMS3 = rms
QL1 = QL2
UL1 = UL2
VL1 = VL2
QL2 = QL3
UL2 = UL3
VL2 = VL3
QL3 = q_leakage
UL3 = u_leakage
VL3 = v_leakage
pollogger.info("######################################")
pollogger.info(
"RMS based dynamic ranges: "
+ str(DR1)
+ ","
+ str(DR2)
+ ","
+ str(DR3)
)
pollogger.info(
"RMS of the images: "
+ str(RMS1)
+ ","
+ str(RMS2)
+ ","
+ str(RMS3)
)
pollogger.info(
f"Stokes I to Q leakage: {round(QL1*100.0,3)}, {round(QL2*100.0,3)}, {round(QL3*100.0,3)}%."
)
pollogger.info(
f"Stokes I to U leakage: {round(UL1*100.0,3)}, {round(UL2*100.0,3)}, {round(UL3*100.0,3)}%."
)
pollogger.info(
f"Stokes I to V leakage: {round(VL1*100.0,3)}, {round(VL2*100.0,3)}, {round(VL3*100.0,3)}%."
)
pollogger.info(f"Maximum pixel offset: {max_pixel_offset}")
leakage_converged = (QL3 == 0.0 and UL3 == 0.0 and VL3 == 0.0) or (
(QL2 - QL3) <= 0.01 and (UL2 - UL3) <= 0.01 and (VL2 - VL3) <= 0.01
)
########################################
# Leakage pr big DR related issues
#########################################
###################################################################
# Condition 1: If solving per antenna decrease DR, solve per array
###################################################################
if not solve_array_leakage and (DR3 < 0.9 * DR2 or RMS3 > 1.1 * RMS2):
pollogger.warning(
"Solving over array instead of antenna, as DR decreases."
)
solve_array_leakage = True
issue_occured = True
num_iter_after_reset = 0
if os.path.exists(last_round_ms):
pollogger.info("Replacing with previous measurement set.")
os.system(f"rm -rf {msname}")
os.system(f"cp -r {last_round_ms} {msname}")
##########################################
# Condition 2: If leakage increased
##########################################
if (num_iter == 2 or num_iter > 4) and (
(abs(QL3) - abs(QL2)) > 0.1
or (abs(UL3) - abs(UL2)) > 0.1
or (abs(VL3) - abs(VL2)) > 0.1
):
issue_occured = True
pollogger.warning("Leakage increased by 10%.")
if os.path.exists(last_round_ms):
pollogger.info("Replacing with previous measurement set.")
num_iter -= 1
os.system(f"rm -rf {msname}")
os.system(f"cp -r {last_round_ms} {msname}")
#########################################
# Condition 3: If leakage becomes nan
#########################################
if np.isnan(QL3) or np.isnan(UL3) or np.isnan(VL3):
issue_occured = True
if num_iter == 0:
pollogger.error(
"Leakages become nan. Serious calibration issue occured at the first round."
)
return 1, msname, [], "", 0
pollogger.warning(
"Leakages become nan. Serious calibration issue occured."
)
if os.path.exists(last_round_ms):
pollogger.info("Replacing with previous measurement set.")
num_iter -= 1
os.system(f"rm -rf {msname}")
os.system(f"cp -r {last_round_ms} {msname}")
if not solve_array_leakage:
num_iter_after_reset = 0
pollogger.info("Solving over array instead of antenna.")
solve_array_leakage = True
else:
if not use_solarflagger and DR3 < 100:
use_solarflagger = True
if (
use_solarflagger
and not do_flag
and num_iter_after_flag == 0
):
pollogger.info("Trying uvsub flagging.")
do_flag = True
restore_flag = False
else:
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
if num_iter > min_iter:
pollogger.warning(
"Stopping self-calibration. Using last round caltables."
)
return (
0,
msname,
last_round_gaintable,
last_leakage_file,
DR2,
)
else:
pollogger.error(
"Leakages become nan. Serious calibration issue occured at the before completing minimum rounds."
)
return 1, msname, [], "", 0
################################
# DR decraeses
################################
# Condition 1: If DR decreased below starting DR
# Condition 2: If DR is decreasing (DR decrease in pol selfcal)
# Condition 3: If DR suddenly decreased
###############################################################
cond1 = DR3 < 0.9 * min_DR and num_iter_after_reset > 1
cond2 = (
(DR3 < 0.9 * DR2 and DR2 > 1.5 * DR1)
and num_iter > min_iter
and num_iter_after_reset > 1
and leakage_converged
)
cond3 = (
DR3 < 0.7 * DR2
and num_iter > min_iter
and num_iter_after_reset > 1
and leakage_converged
)
if cond1 or cond2 or cond3:
##############################
# Printing condition messages
##############################
if cond1:
pollogger.warning(
f"Dynamic range decreased below start dynamic range: {min_DR}."
)
if cond2:
pollogger.warning(
"Dynamic range is decreasing after minimum numbers of rounds."
)
if cond3:
pollogger.warning(
"Dynamic range dropped suddenly. Using last round caltable as final."
)
###################################
# Replacing previous ms
###################################
issue_occured = True
if os.path.exists(last_round_ms):
pollogger.info("Replacing with previous measurement set.")
os.system(f"rm -rf {msname}")
os.system(f"cp -r {last_round_ms} {msname}")
if not solve_array_leakage:
num_iter_after_reset = 0
pollogger.info("Solving over array instead of antenna.")
solve_array_leakage = True
else:
if not use_solarflagger and DR3 < 100:
use_solarflagger = True
if (
use_solarflagger
and not do_flag
and num_iter_after_flag == 0
):
pollogger.info("Trying uvsub flagging.")
do_flag = True
num_iter_after_flag += 1
else:
if num_iter > min_iter:
pollogger.warning(
"Stopping self-calibration. Using last round caltables."
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return (
0,
msname,
last_round_gaintable,
last_leakage_file,
DR2,
)
else:
pollogger.error(
"Encountered this error before minimum number of rounds."
)
return 1, msname, [], "", 0
###########################
# If maximum DR has reached
###########################
if (
DR3 > max_DR
and num_iter > min_iter
and num_iter_after_reset > 1
and leakage_converged
):
pollogger.info("Maximum dynamic range is reached.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, leakage_file, DR3
###########################
# Checking DR convergence
###########################
########################################
# Condition 1
# If DR does not increase a certain percentage
# Leakage becomes zero or did not reduce
########################################
if (
abs(DR1 - DR2) / DR2 < DR_convergence_frac
and num_iter > min_iter
and num_iter_after_reset > 1
and leakage_converged
):
pollogger.info("Self-calibration has converged.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, leakage_file, DR3
#########################################
# If maximum iteration has reached
#########################################
elif (
num_iter > min_iter
and num_iter_after_reset > 1
and num_iter == max_iter
):
pollogger.info(
"Self-calibration is finished. Maximum iteration is reached.\n"
)
if leakage_converged is False:
pollogger.warning("Leakage did not converge.\n")
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 0, msname, gaintable, leakage_file, DR3
num_iter += 1
num_iter_after_reset += 1
os.system(f"cp -r {msname} {msname}.round{num_iter}")
if not issue_occured:
last_round_gaintable = gaintable
last_leakage_file = leakage_file
last_round_ms = f"{msname}.lastround"
if os.path.exists(last_round_ms):
os.system(f"rm -rf {last_round_ms}")
os.system(f"cp -r {msname} {last_round_ms}")
except Exception:
pollogger.exception(
"Exception occured in polarisation self-calibration.", exc_info=True
)
os.system("rm -rf *_selfcal_present*")
time.sleep(5)
clean_shutdown(sub_observer)
return 1, msname, [], "", 0
[docs]
def do_full_selfcal(
msname="",
workdir="",
selfcaldir="",
metafits="",
cal_applied=True,
start_threshold=5,
end_threshold=3,
max_iter=30,
max_DR=100000,
intselfcal_min_iter=3,
polselfcal_min_iter=5,
DR_convergence_frac=0.1,
uvrange="",
minuv=0,
solint="60s",
weight="briggs",
robust=0.0,
do_apcal=True,
do_polcal=True,
min_tol_factor=1.0,
applymode="calonly",
solar_selfcal=True,
use_solarflagger=False,
ncpu=1,
mem=1,
logfile_prefix="selfcal",
logger=None,
):
"""
Perform both intensity and polarisation self-calibration
Returns
-------
int
Intensity selfcal success message
int
Polarisation selfcal success message
list
Intensity selfcal gaintables
list
Polarisation selfcal gaintables
str
Leakage information file
float
Final intensity selfcal dynamic range
float
Final polarisation selfcal dynamic range
"""
if logger is None:
logger = get_logger_safe()
ncpu = max(1, ncpu)
mem = abs(mem)
selfcaldir = selfcaldir.rstrip("/")
logfile_prefix = logfile_prefix.rstrip("/")
logger.info(f"Starting intensity self-calibration for ms: {msname}.")
unflagged_antenna_names, flag_frac_list = get_unflagged_antennas(msname)
msmd = msmetadata()
msmd.open(msname)
refant_ids = sorted(
[msmd.antennaids(antname)[0] for antname in unflagged_antenna_names]
)[0]
refant = str(refant_ids)
msmd.close()
intensity_selfcal_msg, selfcal_ms, gaintable, try_nondisk_flag, int_DR = do_selfcal(
msname=msname,
workdir=workdir,
selfcaldir=f"{selfcaldir}_int",
metafits=metafits,
cal_applied=cal_applied,
refant=str(refant),
start_threshold=start_threshold,
end_threshold=end_threshold,
max_iter=max_iter,
max_DR=max_DR,
min_iter=max(3, intselfcal_min_iter),
DR_convergence_frac=DR_convergence_frac,
uvrange=uvrange,
minuv=minuv,
solint=solint,
weight=weight,
robust=robust,
do_apcal=do_apcal,
min_tol_factor=min_tol_factor,
applymode=applymode,
solar_selfcal=solar_selfcal,
use_solarflagger=use_solarflagger,
ncpu=ncpu,
mem=mem,
logfile=f"{logfile_prefix}_int.log",
)
if intensity_selfcal_msg != 0:
logger.error("Error occured in intensity self-calibration.")
return intensity_selfcal_msg, 1, [], [], "", int_DR, 0
elif do_polcal is False:
logger.info("Polarisation calibration is not requested.")
return 0, 1, gaintable, [], "", int_DR, 0
else:
logger.info(f"Starting polarisation self-calibration for ms: {msname}.")
pol_selfcal_msg, pol_selfcal_ms, quartical_table, leakage_file, pol_DR = (
do_polselfcal(
msname=selfcal_ms,
workdir=workdir,
selfcaldir=f"{selfcaldir}_pol",
metafits=metafits,
max_iter=max(10, int(max_iter / 3)),
max_DR=max_DR,
min_iter=max(5, polselfcal_min_iter),
threshold=end_threshold,
DR_convergence_frac=DR_convergence_frac,
uvrange=uvrange,
minuv=minuv,
weight=weight,
robust=robust,
solar_selfcal=solar_selfcal,
use_solarflagger=use_solarflagger,
try_nondisk_flag=try_nondisk_flag,
ncpu=ncpu,
mem=mem,
logfile=f"{logfile_prefix}_pol.log",
)
)
return (
intensity_selfcal_msg,
pol_selfcal_msg,
gaintable,
quartical_table,
leakage_file,
int_DR,
pol_DR,
)
[docs]
def main(
mslist,
metafits,
workdir,
caldir,
cal_applied=True,
start_thresh=5,
stop_thresh=3,
max_iter=30,
max_DR=100000,
intselfcal_min_iter=3,
polselfcal_min_iter=5,
conv_frac=0.1,
solint="60s",
uvrange="",
minuv=0,
weight="briggs",
robust=0.0,
applymode="calonly",
min_tol_factor=1.0,
do_apcal=True,
do_polcal=True,
solar_selfcal=True,
use_solarflagger=False,
keep_backup=False,
cpu_frac=0.8,
mem_frac=0.8,
logfile=None,
jobid=0,
verbose=False,
start_remote_log=False,
dask_client=None,
):
"""
Perform iterative self-calibration on a list of measurement sets.
Parameters
----------
mslist : str
Comma-separated list of target measurement sets to be self-calibrated.
metafits : str
Metafits file
workdir : str
Path to the working directory for outputs, intermediate files, and logs.
caldir : str
Directory containing calibration tables (e.g., from flux or phase calibrators).
cal_applied : bool, optional
Basic initial calibration applied or not.
start_thresh : float, optional
Initial image dynamic range threshold to start self-calibration. Default is 5.
stop_thresh : float, optional
Target dynamic range at which to stop iterative self-calibration. Default is 3.
max_iter : int, optional
Maximum number of self-calibration iterations. Default is 30.
max_DR : float, optional
Maximum dynamic range allowed before halting iterations. Default is 100000.
intselfcal_min_iter : int, optional
Minimum number of iterations before checking for convergence for intensity selfcal. Default is 3.
polselfcal_min_iter : int, optional
Minimum number of iterations before checking for convergence for polarisation selfcal. Default is 5.
conv_frac : float, optional
Convergence criterion: fractional change in dynamic range below which iteration stops. Default is 0.1.
solint : str, optional
Solution interval for gain calibration (e.g., "inf", "10s", "int"). Default is "60s".
uvrange : str, optional
UV range to be used for imaging and calibration, in CASA format. Default is "" (all baselines).
minuv : float, optional
Minimum baseline length (in wavelengths) to include. Default is 10.
weight : str, optional
Weighting scheme for imaging (e.g., "natural", "uniform", "briggs"). Default is "briggs".
robust : float, optional
Robustness parameter for Briggs weighting (ignored if not using "briggs"). Default is 0.0.
applymode : str, optional
Apply mode for calibration tables ("calonly", "calflag", etc.). Default is "calonly".
min_tol_factor : float, optional
Minimum factor for tolerance comparison during convergence checks. Default is 1.0.
do_apcal : bool, optional
Whether to apply polarization and bandpass calibration before starting selfcal. Default is True.
do_polcal : bool, optional
Whether perform polarisation self-calibration or not
solar_selfcal : bool, optional
If True, uses solar-specific masking and flux normalization. Default is True.
use_solarflagger : bool, optional
Use solar flagger or not. Default is False.
keep_backup : bool, optional
If True, keeps backup MS before applying selfcal solutions. Default is False.
cpu_frac : float, optional
Fraction of available CPUs to use per job. Default is 0.8.
mem_frac : float, optional
Fraction of available system memory to use per job. Default is 0.8.
logfile : str, optional
Log file name
jobid : int, optional
Identifier for job tracking and logging. Default is 0.
verbose : bool, optional
Verbose logs
start_remote_log : bool, optional
Whether to initiate remote logging via job credentials. Default is False.
dask_client : dask.client, optional
Dask client
Returns
-------
int
Success message
int
Intensity selfcal success number
int
Intensity selfcal failed number
int
Polarisation selfcal success number
int
Polarisation selfcal failed nunber
float
Average intensity selfcal dynamic range
float
Average polarisation selfcal dynamic range
float
Maximum intensity selfcal dynamic range
float
Maximum polarisation selfcal dynamic range
"""
logger = get_logger_safe()
if verbose:
logger.setLevel(logging.DEBUG)
cpu_frac = min(0.8, abs(cpu_frac))
mem_frac = min(0.8, abs(mem_frac))
mslist = mslist.split(",")
if workdir == "":
workdir = os.path.dirname(os.path.abspath(mslist[0])) + "/workdir"
os.makedirs(workdir, exist_ok=True)
os.chdir(workdir)
logger.debug(f"Current working directory: {os.getcwd()}")
if caldir == "" or not os.path.exists(caldir):
caldir = f"{workdir}/caltables"
os.makedirs(caldir, exist_ok=True)
logger.debug(f"Output caltables directory: {caldir}.")
############
# Logger
############
observer = None
if (
start_remote_log
and os.path.exists(f"{workdir}/.jobname_password.npy")
and logfile is not None
):
time.sleep(5)
jobname, password = np.load(
f"{workdir}/.jobname_password.npy", allow_pickle=True
)
if os.path.exists(logfile):
observer = init_logger(
"all_selfcal", logfile, jobname=jobname, password=password
)
if observer is None:
logger.info(
"Remote link or jobname is blank. Not transmiting to remote logger."
)
if len(mslist) == 0:
logger.critical("Please provide a valid measurement set list.")
return 1, 0, 0, 0, 0, 0, 0, 0, 0
else:
int_succeed = 0
int_failed = len(mslist)
pol_succeed = 0
pol_failed = len(mslist)
avg_int_DR = 0
avg_pol_DR = 0
max_int_DR = 0
max_pol_DR = 0
dask_cluster = None
if dask_client is None:
dask_client, dask_cluster, dask_dir, nworker = get_local_dask_cluster(
workdir,
cpu_frac=cpu_frac,
mem_frac=mem_frac,
max_worker=len(mslist) + 1,
)
if dask_client is None:
logger.critical("Error occured in creating local cluster.")
return 1, 0, 0, 0, 0, 0, 0, 0, 0
scale_worker_and_wait(dask_cluster, dask_client, nworker)
###########################
# WSClean container
###########################
container_name = "paircarswsclean"
container_present = check_udocker_container(container_name)
if not container_present:
logger.debug(f"Initializing {container_name}.")
container_name = initialize_wsclean_container(name=container_name, verbose=True)
if container_name is None:
logger.critical(
f"Container {container_name} is not initiated. First initiate container and then run."
)
return 1, int_succeed, int_failed, pol_succeed, pol_failed, 0, 0, 0, 0
if do_polcal:
container_name = "paircarsquartical"
container_present = check_udocker_container(container_name)
if not container_present:
logger.debug(f"Initializing {container_name}.")
container_name = initialize_quartical_container(
name=container_name, verbose=True
)
if container_name is None:
logger.critical(
f"Container {container_name} is not initiated. First initiate container and then run."
)
return 1, int_succeed, int_failed, pol_succeed, pol_failed, 0, 0, 0, 0
try:
for banner in print_banner(
"Starting self-calibrations.", no_print=True
).splitlines():
logger.info(banner)
if do_polcal and do_apcal is False:
logger.info(
"Polarisation self-calibration is requested without amplitude-phase intensity self-calibration. Switching off polarisation self-calibration."
)
do_polcal = False
header = fits.getheader(metafits)
obsid = header["GPSTIME"]
partial_do_selfcal = partial(
do_full_selfcal,
metafits=str(metafits),
cal_applied=bool(cal_applied),
start_threshold=float(start_thresh),
end_threshold=float(stop_thresh),
max_iter=int(max_iter),
max_DR=float(max_DR),
intselfcal_min_iter=int(intselfcal_min_iter),
polselfcal_min_iter=int(polselfcal_min_iter),
DR_convergence_frac=float(conv_frac),
uvrange=str(uvrange),
minuv=float(minuv),
solint=str(solint),
weight=str(weight),
robust=float(robust),
do_apcal=do_apcal,
do_polcal=do_polcal,
applymode=applymode,
min_tol_factor=float(min_tol_factor),
solar_selfcal=solar_selfcal,
use_solarflagger=use_solarflagger,
logger=logger,
)
####################################
# Filtering any corrupted ms
#####################################
filtered_mslist = [] # Filtering in case any ms is corrupted
for ms in mslist:
checkcol = check_datacolumn_valid(ms)
if checkcol:
filtered_mslist.append(ms)
else:
logger.warning(f"Issue in : {ms}")
os.system(f"rm -rf {ms}")
mslist = filtered_mslist
if len(mslist) == 0:
logger.critical("No filtered ms to continue.")
return 1, int_succeed, int_failed, pol_succeed, pol_failed, 0, 0, 0, 0
client_info = dask_client.scheduler_info()["workers"]
njobs = len(client_info)
worker_mem_list = []
for addr, w in client_info.items():
worker_mem_list.append(w["memory_limit"] / 1024**3)
if len(worker_mem_list) > 0:
mem_limit = round(min(worker_mem_list), 3)
else:
mem_limit = 1
n_threads = os.environ.get("OMP_NUM_THREADS")
if n_threads is not None:
n_threads = int(n_threads)
else:
n_threads = 1
logger.info("#################################")
logger.info(f"Total dask worker: {njobs}")
logger.info(f"CPU per worker: {n_threads}")
logger.info(f"Memory per worker: {mem_limit} GB")
logger.info("#################################")
os.makedirs(f"{workdir}/logs", exist_ok=True)
tasks = []
for ms in mslist:
obsid = get_MWA_OBSID(ms)
coarse_chan = get_MWA_coarse_chan(ms)
if len(coarse_chan) > 1:
coarse_chan = f"{min(coarse_chan)}-{max(coarse_chan)}"
else:
coarse_chan = f"{min(coarse_chan)}"
logfile_prefix = f"{workdir}/logs/selfcal_{obsid}_ch_{coarse_chan}"
logger.info(f"Measurement set name: {ms}.")
logger.info(f"Self-cal log file: {logfile_prefix}_int.log")
if do_polcal:
logger.info(f"Polarisation self-cal log file: {logfile_prefix}_pol.log")
tasks.append(
delayed(partial_do_selfcal)(
ms,
workdir,
workdir + "/" + os.path.basename(ms).split(".ms")[0] + "_selfcal",
ncpu=n_threads,
mem=mem_limit,
logfile_prefix=logfile_prefix,
)
)
logger.info("Starting all self-calibration.")
results = list(dask_client.gather(dask_client.compute(tasks)))
gcal_list = []
bpass_list = []
dcal_list = []
leakage_file_list = []
succeed_intselfcal = 0
failed_intselfcal = 0
succeed_polselfcal = 0
failed_polselfcal = 0
int_DR_list = []
pol_DR_list = []
for i in range(len(results)):
r = results[i]
int_msg = r[0]
int_DR = r[5]
pol_DR = r[6]
int_DR_list.append(int_DR)
pol_DR_list.append(pol_DR)
if int_msg != 0:
logger.error(
f"Intensity self-calibration was not successful for ms: {mslist[i]}."
)
os.system(
f"touch {workdir}/.intselfcal_failed_{os.path.basename(mslist[i])}"
)
failed_intselfcal += 1
else:
try:
gaintables = r[2]
gcal = gaintables[0]
cal_metadata = get_caltable_metadata(gcal)
freq_start = cal_metadata["Channel 0 frequency (MHz)"]
bw = cal_metadata["Bandwidth (MHz)"]
freq_end = freq_start + bw
ch_start = freq_to_MWA_coarse(freq_start)
ch_end = freq_to_MWA_coarse(freq_end)
if ch_end > ch_start:
coarse_chan = f"{ch_start}-{ch_end}"
else:
coarse_chan = f"{ch_start}"
final_gain_caltable = (
caldir + f"/selfcal_{obsid}_ch_{coarse_chan}.gcal"
)
os.system(f"rm -rf {final_gain_caltable}")
os.system(f"cp -r {gcal} {final_gain_caltable}")
gcal_list.append(final_gain_caltable)
if len(gaintables) > 1:
bpass = gaintables[1]
cal_metadata = get_caltable_metadata(bpass)
freq_start = cal_metadata["Channel 0 frequency (MHz)"]
bw = cal_metadata["Bandwidth (MHz)"]
freq_end = freq_start + bw
ch_start = freq_to_MWA_coarse(freq_start)
ch_end = freq_to_MWA_coarse(freq_end)
if ch_end > ch_start:
coarse_chan = f"{ch_start}-{ch_end}"
else:
coarse_chan = f"{ch_start}"
final_bpass_caltable = (
caldir + f"/selfcal_{obsid}_ch_{coarse_chan}.bcal"
)
os.system(f"rm -rf {final_bpass_caltable}")
os.system(f"cp -r {bpass} {final_bpass_caltable}")
bpass_list.append(final_bpass_caltable)
os.system(
f"touch {workdir}/.intselfcal_succeed_{os.path.basename(mslist[i])}"
)
succeed_intselfcal += 1
except Exception:
logger.exception(
"Exception occured in filtering intensity self-calibration caltables.",
exc_info=True,
)
os.system(
f"touch {workdir}/.intselfcal_failed_{os.path.basename(mslist[i])}"
)
failed_intselfcal += 1
if do_polcal:
pol_msg = r[1]
if pol_msg != 0:
logger.error(
f"Polarisation self-calibration was not successful for ms: {mslist[i]}."
)
os.system(
f"touch {workdir}/.polselfcal_failed_{os.path.basename(mslist[i])}"
)
failed_polselfcal += 1
else:
try:
leakage_file = r[4]
quartical_tables = r[3]
dcal = quartical_tables[0]
cal_metadata = get_quartical_table_metadata(dcal)
freq_start = cal_metadata["Channel 0 frequency (MHz)"]
bw = cal_metadata["Bandwidth (MHz)"]
freq_end = freq_start + bw
ch_start = freq_to_MWA_coarse(freq_start)
ch_end = freq_to_MWA_coarse(freq_end)
if ch_end > ch_start:
coarse_chan = f"{ch_start}-{ch_end}"
else:
coarse_chan = f"{ch_start}"
final_leakage_caltable = (
caldir + f"/selfcal_{obsid}_ch_{coarse_chan}.dcal"
)
os.system(f"rm -rf {final_leakage_caltable}")
os.system(f"cp -r {dcal} {final_leakage_caltable}")
dcal_list.append(final_leakage_caltable)
final_leakage_info = (
caldir + f"/selfcal_{obsid}_ch_{coarse_chan}.leakage"
)
os.system(f"rm -rf {final_leakage_info}")
os.system(f"cp -r {leakage_file} {final_leakage_info}")
leakage_file_list.append(final_leakage_info)
os.system(
f"touch {workdir}/.polselfcal_succeed_{os.path.basename(mslist[i])}"
)
succeed_polselfcal += 1
except Exception:
logger.exception(
"Error occured in filtering polarisation self-calibration caltables.",
exc_info=True,
)
os.system(
f"touch {workdir}/.polselfcal_failed_{os.path.basename(mslist[i])}"
)
failed_polselfcal += 1
if not keep_backup:
for ms in mslist:
int_selfcaldir = (
workdir
+ "/"
+ os.path.basename(ms).split(".ms")[0]
+ "_selfcal_int"
)
os.system(f"rm -rf {int_selfcaldir}")
pol_selfcaldir = (
workdir
+ "/"
+ os.path.basename(ms).split(".ms")[0]
+ "_selfcal_pol"
)
os.system(f"rm -rf {pol_selfcaldir}")
if len(gcal_list) > 0:
logger.info("Final gaincal selfcal caltables:")
for gcal in gcal_list:
logger.info(gcal)
msg = 0
if len(bpass_list) > 0:
logger.info("Final bandpass selfcal caltables:")
for bpass in bpass_list:
logger.info(bpass)
else:
logger.warning("No bandpass self-calibration is present.")
if len(dcal_list) > 0:
logger.info("Final polarisation selfcal caltables:")
for dcal in dcal_list:
logger.info(dcal)
else:
logger.error("No self-calibration is successful.")
msg = 1
logger.info(f"Total self-calibration measurement sets: {len(mslist)}")
logger.info(
f"Total successful intensity self-calibration: {succeed_intselfcal}"
)
logger.info(f"Total failed intensity self-calibration: {failed_intselfcal}")
int_succeed, int_failed = succeed_intselfcal, failed_intselfcal
if do_polcal:
logger.info(
f"Total successful polarisation self-calibration: {succeed_polselfcal}"
)
logger.info(
f"Total failed polarisation self-calibration: {failed_polselfcal}"
)
pol_succeed, pol_failed = succeed_polselfcal, failed_polselfcal
if succeed_intselfcal == 0:
msg = 1
if len(int_DR_list) > 0:
avg_int_DR = round(np.nanmedian(int_DR_list), 2)
max_int_DR = round(np.nanmax(int_DR_list), 2)
else:
avg_int_DR = 0
max_int_DR = 0
if len(pol_DR_list) > 0:
avg_pol_DR = round(np.nanmedian(pol_DR_list), 2)
max_pol_DR = round(np.nanmax(pol_DR_list), 2)
else:
avg_pol_DR = 0
max_pol_DR = 0
logger.info(f"Average intensity self-calibration dynamic range: {avg_int_DR}")
logger.info(
f"Average polarisation self-calibration dynamic range: {avg_pol_DR}"
)
except Exception:
logger.exception("Exception occured in self-calibration.", exc_info=True)
msg = 1
avg_int_DR = 0
avg_pol_DR = 0
max_int_DR = 0
max_pol_DR = 0
finally:
time.sleep(5)
clean_shutdown(observer)
for ms in mslist:
if os.path.exists(ms):
drop_cache(ms)
if dask_cluster is not None:
dask_client.shutdown()
dask_client.close()
dask_cluster.close()
drop_cache(workdir)
os.system(f"rm -rf {dask_dir}")
return (
msg,
int_succeed,
int_failed,
pol_succeed,
pol_failed,
avg_int_DR,
avg_pol_DR,
max_int_DR,
max_pol_DR,
)
[docs]
def cli():
parser = argparse.ArgumentParser(
description="Self-calibration", formatter_class=SmartDefaultsHelpFormatter
)
# Essential parameters
basic_args = parser.add_argument_group(
"###################\nEssential parameters\n###################"
)
basic_args.add_argument(
"mslist",
type=str,
help="Comma-separated list of measurement sets (required positional argument)",
)
basic_args.add_argument(
"metafits",
type=str,
help="Metafits file",
)
basic_args.add_argument(
"--workdir",
type=str,
default="",
required=True,
help="Working directory",
)
basic_args.add_argument(
"--caldir",
type=str,
default="",
required=True,
help="Caltable directory",
)
# Advanced parameters
adv_args = parser.add_argument_group(
"###################\nAdvanced calibration and imaging parameters\n###################"
)
adv_args.add_argument(
"--no_cal_applied",
action="store_false",
dest="cal_applied",
help="Basic calibration is not applied",
)
adv_args.add_argument(
"--start_thresh",
type=float,
default=5,
help="Starting CLEANing threshold",
metavar="Float",
)
adv_args.add_argument(
"--stop_thresh",
type=float,
default=3,
help="Stop CLEANing threshold",
metavar="Float",
)
adv_args.add_argument(
"--max_iter",
type=int,
default=30,
help="Maximum number of selfcal iterations (in each mode, phaseonly, amplitude-phase, polselfcal)",
metavar="Integer",
)
adv_args.add_argument(
"--max_DR",
type=float,
default=100000,
help="Maximum dynamic range",
metavar="Float",
)
adv_args.add_argument(
"--intselfcal_min_iter",
type=int,
default=3,
help="Minimum number of intensity selfcal iterations",
metavar="Integer",
)
adv_args.add_argument(
"--polselfcal_min_iter",
type=int,
default=5,
help="Minimum number of polarisation selfcal iterations",
metavar="Integer",
)
adv_args.add_argument(
"--conv_frac",
type=float,
default=0.1,
help="Fractional change in DR to determine convergence",
metavar="Float",
)
adv_args.add_argument("--solint", type=str, default="60s", help="Solution interval")
adv_args.add_argument(
"--uvrange",
type=str,
default="",
help="Calibration UV-range (CASA format)",
)
adv_args.add_argument(
"--minuv",
type=float,
default=0,
help="Minimum UV-lambda used for imaging",
metavar="Float",
)
adv_args.add_argument("--weight", type=str, default="briggs", help="Imaging weight")
adv_args.add_argument(
"--robust",
type=float,
default=0.0,
help="Robust parameter for briggs weight",
metavar="Float",
)
adv_args.add_argument(
"--applymode",
type=str,
default="calonly",
help="Solution apply mode",
metavar="String",
)
adv_args.add_argument(
"--min_tol_factor",
type=float,
default=1.0,
help="Minimum tolerable variation in temporal direction in percentage",
metavar="Float",
)
adv_args.add_argument(
"--no_apcal",
action="store_false",
dest="do_apcal",
help="Do not perform ap-selfcal",
)
adv_args.add_argument(
"--no_polcal",
action="store_false",
dest="do_polcal",
help="Do not perform polarisation selfcal",
)
adv_args.add_argument(
"--no_solar_selfcal",
action="store_false",
dest="solar_selfcal",
help="Do not perform solar self-calibration",
)
adv_args.add_argument(
"--use_solarflagger",
action="store_true",
help="Use solar flagger or not",
)
adv_args.add_argument(
"--keep_backup",
action="store_true",
help="Keep backup of self-calibration rounds",
)
adv_args.add_argument("--verbose", action="store_true", help="Verbose logs")
adv_args.add_argument("--jobid", type=int, default=0, help="Job ID")
# Resource management parameters
hard_args = parser.add_argument_group(
"###################\nHardware resource management parameters\n###################"
)
hard_args.add_argument(
"--cpu_frac",
type=float,
default=0.8,
help="CPU fraction to use",
metavar="Float",
)
hard_args.add_argument(
"--mem_frac",
type=float,
default=0.8,
help="Memory fraction to use",
metavar="Float",
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
return 1
args = parser.parse_args()
msg, _, _, _, _, _, _, _, _ = main(
mslist=args.mslist,
metafits=args.metafits,
workdir=args.workdir,
cal_applied=args.cal_applied,
caldir=args.caldir,
start_thresh=args.start_thresh,
stop_thresh=args.stop_thresh,
max_iter=args.max_iter,
max_DR=args.max_DR,
intselfcal_min_iter=args.intselfcal_min_iter,
polselfcal_min_iter=args.polselfcal_min_iter,
conv_frac=args.conv_frac,
solint=args.solint,
uvrange=args.uvrange,
minuv=args.minuv,
weight=args.weight,
robust=args.robust,
applymode=args.applymode,
min_tol_factor=args.min_tol_factor,
do_apcal=args.do_apcal,
do_polcal=args.do_polcal,
solar_selfcal=args.solar_selfcal,
use_solarflagger=args.use_solarflagger,
keep_backup=args.keep_backup,
verbose=args.verbose,
cpu_frac=args.cpu_frac,
mem_frac=args.mem_frac,
jobid=args.jobid,
)
return msg