Source code for paircars.utils.selfcal_utils

import numpy as np
import numexpr as ne
import traceback
import glob
import os
import subprocess
from casatools import msmetadata
from astropy.io import fits
from functools import partial
from .basic_utils import suppress_output, ra_dec_to_hms_dms, mjdsec_to_timestamp
from .resource_utils import limit_threads
from .flagging import do_flag_backup, flag_quartical_table, get_chans_flag
from .solarflagger import flagger
from .calibration import (
    fluxcal_caltable,
    uvrange_casa_to_quartical,
    quartical_matrix_normalize,
    get_cal_flag_info,
)
from .casatasks import normalized_crosscorr_ms
from .imaging import (
    calc_sun_dia,
    get_optimal_image_interval,
    calc_multiscale_scales,
    get_multiscale_bias,
)
from .image_utils import (
    create_circular_mask,
    create_circular_mask_array,
    calc_dyn_range,
    generate_tb_map,
    make_timeavg_image,
    make_stokes_wsclean_imagecube,
)
from .udocker_utils import run_wsclean, run_quartical
from .sunpos_utils import cal_solar_phaseshift, shift_solarcenter


[docs] def cal_crossphase(imagename): """ Function to calculate Stokes U, V leakage through correlation analysis Parameters ---------- imagename : str FITS image Returns ------- float Cross hand phase """ data = fits.getdata(imagename) u_data = data[2, 0, ...].astype(np.float64) v_data = data[3, 0, ...].astype(np.float64) max_pos = np.where(np.abs(v_data) == np.nanmax(np.abs(v_data))) peak_v = v_data[max_pos][0] crossphase_list = np.arange(-180, 180, 1) psi = np.deg2rad(crossphase_list) if u_data.size > 1 and v_data.size > 1: cospsi = np.cos(psi)[:, None, None] sinpsi = np.sin(psi)[:, None, None] u = u_data[None, ...] v = v_data[None, ...] # rotation using numexpr new_u = ne.evaluate("cospsi*u + sinpsi*v") new_v = ne.evaluate("-sinpsi*u + cospsi*v") # compute correlation coefficient for each psi cc_list = [] for i in range(len(crossphase_list)): cc = abs(np.corrcoef(new_u[i].ravel(), new_v[i].ravel())[1, 0]) cc_list.append(cc) cc_list = np.array(cc_list) pos = np.argsort(cc_list) cross_phases = crossphase_list[pos[0:4]] psi = np.deg2rad(cross_phases) cospsi = np.cos(psi)[:, None, None] sinpsi = np.sin(psi)[:, None, None] u = u_data[None, ...] v = v_data[None, ...] new_u = ne.evaluate("cospsi*u + sinpsi*v") new_v = ne.evaluate("-sinpsi*u + cospsi*v") x = np.nanmax(np.abs(new_u), axis=(-2, -1)) pos = np.argsort(x) for i in pos[0:2]: peak_new_v = new_v[i][max_pos] if np.sign(peak_new_v) == np.sign(peak_v): cross_phase = cross_phases[i] return cross_phase
[docs] def do_uvsub_flag(msname, threshold_list=[10, 7, 5], ncpu=1): """ Perform uv-sub flags Parameters ---------- msname : str Measurement set threshold_list: list, optional Threshold list ncpu: int, optional Number of CPU threads to use """ for threshold in threshold_list: count = 0 while count < 5: result, n_final_flagged, n_additional_flagged = flagger( msname, "residual", threshold=threshold, num_processes=ncpu, flagbackup=False, ) if n_additional_flagged == 0: break else: count += 1
[docs] def determine_disk_visibility(msname): """ Determine whether solar disk is visible or not Parameters ---------- msname : str Measurement set Returns ------- numpy.array Channel list where disk may not be detected numpy.array Timestamp list where disk may not be detected numpy.array Timestamps where disk is detected at least in one channel """ from casatools import ms as casamstool, table msmd = msmetadata() msmd.open(msname) freq = msmd.meanfreq(0) msmd.nchan(0) msmd.close() wavelength = (3 * 10**8) / freq uvdist = 10.0 * wavelength tb = table() tb.open(msname) colnames = tb.colnames() tb.close() if "CORRECTED_DATA" in colnames: datacolumn = "corrected" else: datacolumn = "data" normed_msname = normalized_crosscorr_ms(msname, datacolumn=datacolumn.upper()) mstool = casamstool() uvdist = 150.0 * wavelength mstool.open(normed_msname) mstool.select({"uvdist": [uvdist - 10.0, uvdist + 10.0]}) if datacolumn == "corrected": data_first_lobe = np.nanmedian( np.abs(mstool.getdata("CORRECTED_DATA", ifraxis=True)["corrected_data"]), axis=2, ) else: data_first_lobe = np.nanmedian( np.abs(mstool.getdata("DATA", ifraxis=True)["data"]), axis=2 ) mstool.close() r_I = (data_first_lobe[0, ...] + data_first_lobe[-1, ...]) / 2.0 detected = r_I < 0.1 n_detected_per_time = np.nansum(detected, axis=0) detected_timestamps = np.where(n_detected_per_time > 0)[0] pos = np.where(r_I >= 0.1) os.system(f"rm -rf {normed_msname}") if len(pos) == 0: return np.array([], dtype=int), np.array([], dtype=int), detected_timestamps elif len(pos) == 1: chans = pos[0] timestamps = np.zeros_like(chans) return chans, timestamps, detected_timestamps else: chans = pos[0] timestamps = pos[1] return chans, timestamps, detected_timestamps
[docs] def flag_non_disk(msname): """ Flag spectro-temporal blocks when solar disk is not visible Parameters ---------- msname : str Measurement set """ from casatasks import flagdata try: chans, timestamps, detected_timestamps = determine_disk_visibility(msname) if len(detected_timestamps) == 0: return 1 else: msmd = msmetadata() msmd.open(msname) times = msmd.timesforspws(0) msmd.close() for c, t in zip(chans, timestamps): spw = f"0:{c}" timerange = f"{mjdsec_to_timestamp(times[t], str_format=1)}" try: flagdata( vis=msname, mode="manual", spw=spw, timerange=timerange, flagbackup=False, ) except Exception: continue unflag_chans, flag_chans = get_chans_flag(msname) if len(unflag_chans) == 0: return 1 else: return 0 except Exception: traceback.print_exc() return 1
[docs] def get_quiet_sun_flux(freq): """ Get quiet Sun flux density in Jy. Parameters ---------- freq : float Frequency in MHz Returns ------- float Flux density in Jy """ p = np.poly1d([-1.93715165e-06, 7.84627718e-04, -3.15744433e-02, 2.32834400e-01]) flux = p(freq) * 10**4 # Polynomial return in SFU return flux
[docs] def make_qs_model(msname, clname="quiet_sun.cl"): """ Make CASA component list of quiet Sun model Parameters ---------- msname : str Name of the measurement set clname : str, optional Name of the component list Returns ------- str Name of the component list file """ from casatools import componentlist msmd = msmetadata() msmd.open(msname) freq = msmd.meanfreq(0, unit="MHz") phasecenter = msmd.phasecenter(0) msmd.close() radeg = np.rad2deg(phasecenter["m0"]["value"]) decdeg = np.rad2deg(phasecenter["m1"]["value"]) rahms, decdms = ra_dec_to_hms_dms(radeg, decdeg) radec_str = f"J2000 {rahms} {decdms}" sun_size = calc_sun_dia(freq) # In arcmin QS_flux = get_quiet_sun_flux(freq) # In Jy # Make sure the component list does not already exist. The tool will complain otherwise. os.system("rm -rf " + clname) cl = componentlist() cl.addcomponent( dir=radec_str, flux=QS_flux, # For a gaussian, this is the integrated area. fluxunit="Jy", freq=f"{freq}MHz", shape="gaussian", ## Gaussian majoraxis=f"{sun_size}arcmin", minoraxis=f"{sun_size}arcmin", positionangle="0deg", spectrumtype="spectral index", index=0.0, ) # Save the file cl.rename(filename=clname) cl.done() return clname
[docs] def quiet_sun_selfcal(msname, logger, selfcaldir, refant="1", solint="60s"): """ Perform quiet Sun Gaussian model based self-calibration Parameters ---------- msname : str Measurement set logger : str Python logger selfcaldir : str Self-calibration directory refant : str, optional Reference antenna solint : str, optional Solution interval Returns ------- int Success message str Caltable name """ from casatasks import ft, delmod, gaincal, applycal, flagmanager prefix = ( selfcaldir + "/" + os.path.basename(msname).split(".ms")[0] + "_selfcal_present" ) bpass_caltable = prefix.replace("present", f"{0}") + ".gcal" if os.path.exists(bpass_caltable): os.system("rm -rf " + bpass_caltable) do_flag_backup(msname, flagtype="qs_selfcal") try: result = flag_non_disk(msname) if result != 0: logger.info("Could not flag non-disk time properly.") msg = 1 else: ################################### # Import simulated QS model ################################### qs_model = make_qs_model( msname, clname=f"{os.path.basename(msname).split('.ms')[0]}_qs.cl" ) delmod(vis=msname, otf=True, scr=True) ft(vis=msname, complist=qs_model, usescratch=True) os.system(f"rm -rf {qs_model}") ##################### # Perform calibration ##################### logger.info( f"gaincal(vis='{msname}',caltable='{bpass_caltable}',uvrange='<100lambda',refant='{refant}',solint='{solint}',minsnr=1,calmode='p')\n" ) with suppress_output(): gaincal( vis=msname, caltable=bpass_caltable, uvrange="<100lambda", refant=refant, minsnr=1, solint=f"{solint}", solnorm=True, calmode="p", ) if not os.path.exists(bpass_caltable): logger.info("No gain solutions are found.\n") msg = 2 bpass_caltable = "" else: ######################## # Applying solutions ######################## logger.info( f"applycal(vis={msname},gaintable=[{bpass_caltable}],interp=['linear'],applymode='calonly',calwt=[False])\n" ) with suppress_output(): applycal( vis=msname, gaintable=[bpass_caltable], interp=["linear"], applymode="calonly", calwt=[False], ) msg = 0 except Exception: logger.exception(traceback.print_exc()) msg = 2 bpass_caltable = "" finally: with suppress_output(): flagmanager(vis=msname, mode="restore", versionname="qs_selfcal_1") flagmanager(vis=msname, mode="delete", versionname="qs_selfcal_1") return msg, bpass_caltable
[docs] def check_valid_image(imagename): """ Check whether the image is valid or not Parameters ---------- imagename : str Image name Returns ------- bool Whether valid image or not """ data = fits.getdata(imagename) if np.nansum(data) == 0: return False else: return True
[docs] def single_image_update_phasecenter( wsclean_images, wsclean_models, image_cube, model_cube, cellsize, imsize, stokes, logger, ): """ Update phase center of a single set of polarisation image Parameters ---------- wsclean_images : list List of wsclean Stokes images wsclean_models : list List of wsclean Stokes models image_cube : str Stokes image cube name model_cube : str Stokes model cube name cellsize : float Pixel size in arcseconds imsize : int Image size stokes : str Stokes planes of image cube Returns ------- int Success message bool Whether phase shift needed or not float Maximum offset in pixel """ try: ( msg, ra, dec, sun_radeg, sun_decdeg, apparent_pix_ra, apparent_pix_dec, seperation_arcsec, ) = cal_solar_phaseshift(image_cube) if msg != 0: return msg, False shift_needed = False r_offset_list = [] logger.info(f"Shift {seperation_arcsec}arcsec for {image_cube}.") shift_func = partial( shift_solarcenter, sun_radeg=sun_radeg, sun_decdeg=sun_decdeg, apparent_pix_ra=apparent_pix_ra, apparent_pix_dec=apparent_pix_dec, overwrite=True, ) msg, outfile, shifted, r_offset = shift_func(image_cube) shift_needed = bool(shift_needed + shifted) r_offset_list.append(r_offset) for imagename in wsclean_images: msg, outfile, shifted, r_offset = shift_func(imagename) shift_needed = bool(shift_needed + shifted) r_offset_list.append(r_offset) msg, outfile, shifted, r_offset = shift_func(model_cube) shift_needed = bool(shift_needed + shifted) r_offset_list.append(r_offset) for modelname in wsclean_models: msg, outfile, shifted, r_offset = shift_func(modelname) shift_needed = bool(shift_needed + shifted) r_offset_list.append(r_offset) r_offset = max(r_offset_list) return 0, shift_needed, r_offset except Exception: traceback.print_exc() return 1, False, 0
[docs] def correct_spectrosnap_phaseshift( image_dic, model_dic, cellsize, imsize, stokes, logger, ): """ Correct spectrocopic snapshot images for phase shift Parameters ---------- image_dic : dict Image dictionary model_dic : dict Model dictionary cellsize : float Pixel size in arcsecond imsize : int Image size stokes : str Stokes planes of image list logger : logger, optional Python logger Returns ------- int Success message bool If shift needed for any image int Maximum pixel offset """ shifted = False max_pixel_offset_list = [] try: images = list(image_dic.keys()) models = list(model_dic.keys()) for i in range(len(images)): imagename = images[i] modelname = models[i] wsclean_images = image_dic[imagename] wsclean_models = model_dic[modelname] valid_image = check_valid_image(imagename) if valid_image: success_msg, shift_needed, max_pixel_offset = ( single_image_update_phasecenter( wsclean_images, wsclean_models, imagename, modelname, cellsize, imsize, stokes, logger, ) ) max_pixel_offset_list.append(max_pixel_offset) if shift_needed: shifted = True return 0, shifted, max(max_pixel_offset_list) except Exception: traceback.print_exc() return 1, shifted, 0
[docs] def calc_leakage(imagename, threshold=5, disc_size=50): """ Calculate Stokes I to Q, U, V leakages Parameters ---------- imagename : str Image name threshold : float Threshold to choose region with Stokes I detection disc_size : float Solar disc area in arcminute to mask for calculating rms N.B.: Chosen slightly larger to avoid any off-coronal emission from CMEs Returns ------- float Stokes I to Q leakage float Stokes I to U leakage float Stokes I to V leakage float Stokes I to Q leakage error float Stokes I to U leakage error float Stokes I to V leakage error """ valid_image = check_valid_image(imagename) if valid_image is False: return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan tb_map = generate_tb_map(imagename) tb_data = fits.getdata(tb_map)[0, 0, ...] / 10**6 # in MK data = fits.getdata(imagename) header = fits.getheader(imagename) pix_size = abs(header["CDELT1"]) * 3600.0 # In arcsec radius = int((disc_size * 60) / pix_size) i_data = data[0, 0, ...] q_data = data[1, 0, ...] u_data = data[2, 0, ...] v_data = data[3, 0, ...] ############################# # Calculating image rms ############################# mask = create_circular_mask_array(i_data, radius) i_rms = np.nanstd(i_data[~mask]) i_thresh = threshold * i_rms ############################################## # Estimating regions for leakage calculation ############################################## pos = np.where((i_data < i_thresh) | (tb_data > 1.0)) q_data[pos] = np.nan u_data[pos] = np.nan v_data[pos] = np.nan q_by_i = q_data / i_data u_by_i = u_data / i_data v_by_i = v_data / i_data q_by_i = q_by_i[~np.isnan(q_by_i)].flatten() u_by_i = u_by_i[~np.isnan(u_by_i)].flatten() v_by_i = v_by_i[~np.isnan(v_by_i)].flatten() ######################################### # Estimating leakage and leakage errors ######################################### q_leakage = round(np.nanmedian(q_by_i), 4) u_leakage = round(np.nanmedian(u_by_i), 4) v_leakage = round(np.nanmedian(v_by_i), 4) q_leakage_err = round( 1.253 * np.nanmedian(abs(q_by_i - q_leakage)) / np.sqrt(q_by_i.size), 6 ) u_leakage_err = round( 1.253 * np.nanmedian(abs(u_by_i - u_leakage)) / np.sqrt(u_by_i.size), 6 ) v_leakage_err = round( 1.253 * np.nanmedian(abs(v_by_i - v_leakage)) / np.sqrt(v_by_i.size), 6 ) os.system(f"rm -rf {tb_map}") return q_leakage, u_leakage, v_leakage, q_leakage_err, u_leakage_err, v_leakage_err
[docs] def correct_image_leakage( imagename, modelname="", q_leakage=0.0, u_leakage=0.0, v_leakage=0.0, threshold=5, disc_size=50, ): """ Correct leakages in image plane Parameters ---------- imagename : str Image name modelname : str, optional Model name q_leakage : float, optional Q leakage u_leakage : float, optional U leakage v_leakage : float, optional V leakage threshold : float Threshold to choose region with Stokes I detection disc_size : float Solar disc area in arcminute to mask for calculating rms N.B.: Chosen slightly larger to avoid any off-coronal emission from CMEs Returns ------- str Leakage corrected imagename str Leakage corrected modelname """ ####################### # Read image data ####################### imagedata = fits.getdata(imagename) image_I = imagedata[0, 0, ...] image_Q = imagedata[1, 0, ...] image_U = imagedata[2, 0, ...] image_V = imagedata[3, 0, ...] if os.path.exists(modelname): correct_model = True else: correct_model = False if correct_model: ########################## # Read model data ########################## modeldata = fits.getdata(modelname) model_I = modeldata[0, 0, ...] model_Q = modeldata[1, 0, ...] model_U = modeldata[2, 0, ...] model_V = modeldata[3, 0, ...] modelheader = fits.getheader(modelname) ################################### # Creating mask #################################### imageheader = fits.getheader(imagename) pix_size = abs(imageheader["CDELT1"]) * 3600.0 # In arcsec radius = int((disc_size * 60) / pix_size) mask = create_circular_mask_array(image_I, radius) #################################### # Calculate rms #################################### q_rms = np.nanstd(image_Q[~mask]) u_rms = np.nanstd(image_U[~mask]) v_rms = np.nanstd(image_V[~mask]) ################################### # Correcting images ################################### image_Q = image_Q - (q_leakage * image_I) image_U = image_U - (u_leakage * image_I) image_V = image_V - (v_leakage * image_I) posq = np.where(abs(image_Q) < threshold * q_rms) posu = np.where(abs(image_U) < threshold * u_rms) posv = np.where(abs(image_V) < threshold * v_rms) imagedata[1, 0, ...] = image_Q imagedata[2, 0, ...] = image_U imagedata[3, 0, ...] = image_V fits.writeto( imagename.split(".fits")[0] + "_leakagecor.fits", data=imagedata, header=imageheader, overwrite=True, ) if correct_model: #################################### # Correcting model images #################################### model_Q = model_Q - (q_leakage * model_I) model_U = model_U - (u_leakage * model_I) model_V = model_V - (v_leakage * model_I) model_Q[posq] = 0.0 model_U[posu] = 0.0 model_V[posv] = 0.0 modeldata[1, 0, ...] = model_Q modeldata[2, 0, ...] = model_U modeldata[3, 0, ...] = model_V modeldata[np.isinf(modeldata)] = 0.0 modeldata[np.isnan(modeldata)] = 0.0 fits.writeto( modelname.split(".fits")[0] + "_leakagecor.fits", data=modeldata, header=modelheader, overwrite=True, ) if correct_model: return ( imagename.split(".fits")[0] + "_leakagecor.fits", modelname.split(".fits")[0] + "_leakagecor.fits", ) else: return ( imagename.split(".fits")[0] + "_leakagecor.fits", None, )
[docs] def correct_pbcor_leakage( imagename, modelname, metafits, pbcor=True, leakagecor=True, pbuncor=True, ncpu=1, ): """ Perform primary beam and leakage correction Parameters ---------- imagename : str Image name modelname : str Model image name metafits : str Metafits file pbcor : bool, optional Perform primary beam correction leakagecor : bool, optional Perform image based residual leakage correction pbuncor : bool, optional Undo primary beam correction ncpu : int, optional Number of CPU threads Returns ------- str Final image str Final model list Leakage and leakage error list """ ncpu = max(1, ncpu) leakage_info = [] freq = fits.getheader(imagename)["CRVAL3"] pbfile = f"freq_{freq}_pb.npy" if pbcor is False: pbcor_image = imagename pbcor_model = modelname else: #################################### # Correcting image #################################### pbcor_image = imagename.split(".fits")[0] + "_pbcor.fits" pbcor_cmds = [ "run-mwa-singlepbcor", imagename, metafits, pbcor_image, "--interpolated", "--num_threads", f"{ncpu}", "--pb_jones_file", f"{pbfile}", ] if os.path.exists(pbfile) is False: pbcor_cmds.append("--save_pb") subprocess.run( pbcor_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False, ) ######################################### # Correcting model ######################################### pbcor_model = modelname.split(".fits")[0] + "_pbcor.fits" pbcor_cmds = [ "run-mwa-singlepbcor", modelname, metafits, pbcor_model, "--interpolated", "--num_threads", f"{ncpu}", "--pb_jones_file", f"{pbfile}", ] subprocess.run( pbcor_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False, ) if leakagecor is False: leakagecor_image = pbcor_image leakagecor_model = pbcor_model else: ######################################## # Estimating and correcting leakage ######################################## ( q_leakage, u_leakage, v_leakage, q_leakage_err, u_leakage_err, v_leakage_err, ) = calc_leakage(pbcor_image) leakage_info = [ q_leakage, u_leakage, v_leakage, q_leakage_err, u_leakage_err, v_leakage_err, ] leakagecor_image, leakagecor_model = correct_image_leakage( pbcor_image, modelname=pbcor_model, q_leakage=q_leakage, u_leakage=u_leakage, v_leakage=v_leakage, ) if pbuncor is False: final_image = leakagecor_image final_model = leakagecor_model else: ########################################## # Restore primary beam corrections ########################################### # For image ############### final_image = imagename.split(".fits")[0] + "_pbuncor.fits" pbcor_cmds = [ "run-mwa-singlepbcor", leakagecor_image, metafits, final_image, "--interpolated", "--num_threads", f"{ncpu}", "--pb_jones_file", f"{pbfile}", "--restore", ] if os.path.exists(pbfile) is False: pbcor_cmds.append("--save_pb") subprocess.run( pbcor_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False, ) ################# # For model ################# final_model = modelname.split(".fits")[0] + "_pbuncor.fits" pbcor_cmds = [ "run-mwa-singlepbcor", leakagecor_model, metafits, final_model, "--interpolated", "--num_threads", f"{ncpu}", "--pb_jones_file", f"{pbfile}", "--restore", ] if os.path.exists(pbfile) is False: pbcor_cmds.append("--save_pb") subprocess.run( pbcor_cmds, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False, ) return final_image, final_model, leakage_info
[docs] def single_image_update_leakage( wsclean_images, wsclean_models, image_cube, model_cube, metafits, pbcor=True, leakagecor=True, pbuncor=True, ncpu=-1, ): """ Update leakage of a single set pf polarisation image Parameters ---------- wsclean_images : list List of wsclean Stokes images wsclean_models : list List of wsclean Stokes models image_cube : str Stokes image cube name model_cube : str Stokes model cube name metafits : str Metafits file pbcor : bool, optional Perform primary beam correction or not leakagecor : bool, optional Perform leakage correction or not pbuncor : bool, optional Undo primary beam correction or not ncpu : int, optional NUmber of CPU threads to use Returns ------- list Leakage informations """ ncpu = max(1, ncpu) valid_image = check_valid_image(image_cube) if valid_image: cor_imagename, cor_modelname, leakage_info = correct_pbcor_leakage( image_cube, model_cube, metafits, pbcor=pbcor, leakagecor=leakagecor, pbuncor=pbuncor, ncpu=ncpu, ) image_data = fits.getdata(cor_imagename) image_data[np.isinf(image_data)] = 0.0 image_data[np.isnan(image_data)] = 0.0 model_data = fits.getdata(cor_modelname) model_data[np.isinf(model_data)] = 0.0 model_data[np.isnan(model_data)] = 0.0 for i in range(len(wsclean_images)): spectro_image_header = fits.getheader(wsclean_images[i]) spectro_image_data = fits.getdata(wsclean_images[i]) spectro_image_data[0, 0, ...] = image_data[i, 0, ...] fits.writeto( wsclean_images[i], data=spectro_image_data, header=spectro_image_header, overwrite=True, ) spectro_model_header = fits.getheader(wsclean_models[i]) spectro_model_data = fits.getdata(wsclean_models[i]) spectro_model_data[0, 0, ...] = model_data[i, 0, ...] fits.writeto( wsclean_models[i], data=spectro_model_data, header=spectro_model_header, overwrite=True, ) return leakage_info else: return
[docs] def correct_spectrosnap_pbleak( image_dic, model_dic, metafits, logger=None, pbcor=True, leakagecor=True, pbuncor=True, ncpu=-1, ): """ Correct spectrocopic snapshot images for leakage Parameters ---------- image_dic : dict Image dictionary model_dic : dict Model dictionary metafits : str Metafits file logger : logger, optional Python logger pbcor : bool, optional Perform primary beam correction leakagecor : bool, optional Leakage correction pbuncor : bool, optional Undo primary beam correction ncpu : int, optional Number of CPU threads to use Returns ------- list Leakage information list """ ncpu = max(1, ncpu) images = list(image_dic.keys()) models = list(model_dic.keys()) leakage_info_list = [] for i in range(len(images)): imagename = images[i] modelname = models[i] fits.getheader(imagename) if "MFS" not in imagename: wsclean_images = image_dic[imagename] wsclean_models = model_dic[modelname] valid_image = check_valid_image(imagename) if valid_image: if logger is not None: logger.info(f"Leakage correction for: {imagename}.") else: print(f"Leakage correction for: {imagename}.") leakage_info = single_image_update_leakage( wsclean_images, wsclean_models, imagename, modelname, metafits, pbcor=pbcor, leakagecor=leakagecor, pbuncor=pbuncor, ncpu=ncpu, ) if leakage_info is not None: leakage_info_list.append(leakage_info) os.system("rm -rf *_pbcor.fits *_leakagecor.fits *_pbuncor.fits *pb.npy") return leakage_info_list
[docs] def selfcal_round( msname, metafits, logger, selfcaldir, cellsize, imsize, round_number=0, uvrange="", minuv=0, calmode="ap", solint="60s", solnorm=True, refant="1", do_bandpass=True, applymode="calonly", threshold=3, weight="briggs", robust=0.0, use_previous_model=False, use_solar_mask=True, mask_radius=40, min_tol_factor=-1, calc_chunks=True, fluxscale_mwa=False, solar_attn=10, pbcor=True, leakagecor=True, pbuncor=True, do_intensity_cal=False, do_polcal=False, solve_array_leakage=False, pol_solnorm=False, do_flag=False, restore_flag=True, ncpu=-1, mem=-1, ): """ A single self-calibration round Parameters ---------- msname : str Name of the measurement set metafits : str Metafits file logger : logger Python logger selfcaldir : str Self-calibration directory cellsize : float Cellsize in arcsec imsize : int Image pixel size round_number : int, optional Selfcal iteration number uvrange : float, optional UV range for calibration minuv : float, optional Minimum uv in lambda calmode : str, optional Calibration mode ('p' or 'ap') solint : str, optional Solution intervals solnorm : bool, optional Solution normalisation refant : str, optional Reference antenna do_bandpass: bool, optional Perform bandpass calibration applymode : str, optional Solution apply mode (calonly or calflag) threshold : float, optional Imaging and auto-masking threshold weight : str, optional Image weighting robust : float, optional Robust parameter for briggs weighting use_previous_model : bool, optional Use previous model use_solar_mask : bool, optional Use solar disk mask or not mask_radius : float, optional Mask radius in arcminute min_tol_factor : float, optional Minimum tolerance factor calc_chunks : bool, optional Whether calculate spectro-temporal chunks or not fluxscale_mwa : bool, optional Fluxscale caltable using reference bandpass solar_attn : float, optional Solar attenuation in dB (only used if fluxscale_mwa is True) pbcor : bool, optional Primary beam correction leakagecor : bool, optional Leakage correction pbuncor : bool, optional Undo primary beam correction do_intensity_cal : bool, optional Perform intensity self-calibration do_polcal : bool, optional Perform polarisation calibration or not solve_array_leakage : bool, optional Perform a single leakage correction over the entire array pol_solnorm : bool, optional Normalise quartical solutions or not do_flag : bool, optional Perform UVsub flagging restore_flag : bool, optional Restore last round flags or not ncpu : int, optional Number of CPUs to use in WSClean mem : float, optional Memory usage limit in WSClean Returns ------- int Success message list Caltable name list float RMS based dynamic range float RMS of the image str Image name str Model image name str Residual image name list Leakage informations [Q_leakage, U_leakage, V_leakage, Q_leakage_error, U_leakage_error, V_leakage_error] int Maximum pixel offset """ ncpu = max(1, ncpu) mem = max(1, mem) limit_threads(n_threads=ncpu) from casatasks import gaincal, bandpass, applycal, flagdata, delmod, flagmanager from casatools import table cwd = os.getcwd() msname = msname.rstrip("/") msname = os.path.abspath(msname) os.chdir(selfcaldir) if minuv > 0: if uvrange == "" or uvrange.startswith(">"): uvrange = f">{minuv}lambda" elif uvrange.startswith("<"): maxuv = uvrange.split("lambda")[0].split("<")[1] uvrange = f"{minuv}~{maxuv}lambda" elif "~" in uvrange: maxuv = uvrange.split("lambda")[0].split("~")[-1] uvrange = f"{minuv}~{maxuv}lambda" if not use_previous_model: delmod(vis=msname, otf=True, scr=True) prefix = ( selfcaldir + "/" + os.path.basename(msname).split(".ms")[0] + "_selfcal_present" ) applycal_gaintable = [] interp = [] leakage_info_list = [] try: #################################### # Determining ms metadata #################################### msmd = msmetadata() msmd.open(msname) freq = msmd.meanfreq(0, unit="MHz") num_chan = msmd.nchan(0) freqres = msmd.chanres(0, unit="MHz")[0] bw = num_chan * freqres msmd.close() ##################################### # Get column names ##################################### tb = table() tb.open(msname) tb.colnames() tb.close() ###################################### # Determine spectro-temporal chunking ###################################### if calc_chunks: header = fits.getheader(metafits) mode = header["MODE"] if "MWAX" in mode: flag_central_chan = False else: flag_central_chan = True if calmode == "ap" or do_polcal: nchans = max(1, int(bw / 0.32)) # Fixed to 320 kHz else: nchans = 1 if min_tol_factor <= 0: min_tol_factor = 1.0 # In percentage msmd = msmetadata() msmd.open(msname) times = msmd.timesforspws(0) msmd.close() diff = np.diff(times) change_idx = np.where(np.diff(diff) != 0)[0] max_ntime = int(len(change_idx) / 2) + 1 nintervals, nchans_variablity = get_optimal_image_interval( msname, temporal_tol_factor=float(min_tol_factor / 100.0), spectral_tol_factor=float(min_tol_factor / 100.0), flag_central_chan=flag_central_chan, max_ntime=max_ntime, ) nchans = min(nchans, nchans_variablity) else: nchans = 1 nintervals = 1 max_ntime = 1 os.system(f"rm -rf {prefix}*image.fits {prefix}*residual.fits") if weight == "briggs": weight += " " + str(robust) wsclean_args = [ "-quiet", f"-scale {cellsize}asec", f"-size {imsize} {imsize}", "-no-dirty", "-gridder wgridder", f"-weight {weight}", "-niter 10000", "-mgain 0.85", "-nmiter 5", "-gain 0.1", f"-minuv-l {minuv}", f"-j {ncpu}", f"-abs-mem {mem}", f"-auto-mask {threshold + 0.1}", f"-auto-threshold {threshold}", ] if do_intensity_cal: wsclean_args.append("-pol I") pol = "I" if calmode == "p": wsclean_args.append("-no-negative") else: wsclean_args.append("-pol IQUV") pol = "IQUV" ngrid = max(1, int(ncpu / 2)) if ngrid > 1: wsclean_args.append(f"-parallel-gridding {ngrid}") ################################################ # Creating and using solar mask ################################################ fits_mask = msname.split(".ms")[0] + "_solar-mask.fits" if not os.path.exists(fits_mask): logger.info(f"Creating solar mask of size: {mask_radius} arcmin.\n") fits_mask = create_circular_mask( msname, cellsize, imsize, mask_radius=mask_radius ) if fits_mask is not None and os.path.exists(fits_mask) and use_solar_mask: wsclean_args.append(f"-fits-mask {fits_mask}") ###################################### # Determining multiscale parameter ###################################### sun_dia = calc_sun_dia(freq) # Sun diameter in arcmin sun_rad = sun_dia / 2.0 multiscale_scales = calc_multiscale_scales(msname, 3, max_scale=sun_rad) scale_bias = round(get_multiscale_bias(freq), 2) wsclean_args.append("-multiscale") wsclean_args.append("-multiscale-gain 0.1") wsclean_args.append( "-multiscale-scales " + ",".join([str(s) for s in multiscale_scales]) ) wsclean_args.append(f"-multiscale-scale-bias {scale_bias}") if imsize >= 1024 and 4 * max(multiscale_scales) < 512: wsclean_args.append("-parallel-deconvolution 512") ##################################### # Temporal imaging configuration ##################################### logger.info(f"Temporal chunks: {nintervals}, Spectral chunks: {nchans}.") if nintervals > 1: wsclean_args.append(f"-intervals-out {nintervals}") if nchans > 1: wsclean_args.append(f"-channels-out {nchans}") wsclean_args.append("-gap-channel-division") wsclean_args.append("-no-mf-weighting") ##################################### # Image naming ##################################### wsclean_args.append(f"-name {prefix}") if use_previous_model and do_polcal is False: previous_models = glob.glob(f"{prefix}*model.fits") total_models_expected = nintervals * nchans if len(previous_models) == total_models_expected: wsclean_args.append("-continue") else: os.system(f"rm -rf {prefix}*") wsclean_cmd = "wsclean " + " ".join(wsclean_args) + " " + msname logger.info(f"\nWSClean command: {wsclean_cmd}\n") msg = run_wsclean(wsclean_cmd, "paircarswsclean", verbose=False) if msg != 0: logger.error("Imaging is not successful.\n") return 1, applycal_gaintable, 0, 0, "", "", "", [], 0 ####################################### # Making stokes cube ####################################### pollist = list(pol) wsclean_images_dic = {} wsclean_models_dic = {} wsclean_residuals_dic = {} for suffix in ["image", "model", "residual"]: stokeslist = [] for p in pollist: if pollist == ["I"]: stokeslist.append( sorted(glob.glob(prefix + "*" + f"-{suffix}.fits")) ) else: stokeslist.append( sorted(glob.glob(prefix + "*" + p + f"-{suffix}.fits")) ) for i in range(len(stokeslist[0])): wsclean_images = sorted([stokeslist[k][i] for k in range(len(pollist))]) image_prefix = ( selfcaldir + "/" + os.path.basename(wsclean_images[0]) .split(f"-{suffix}")[0] .split("-I")[0] ) image_cube = make_stokes_wsclean_imagecube( wsclean_images, image_prefix + f"-IQUV-{suffix}.fits", keep_wsclean_images=True, ) if suffix == "image": wsclean_images_dic[image_cube] = wsclean_images elif suffix == "model": wsclean_models_dic[image_cube] = wsclean_images elif suffix == "residual": wsclean_residuals_dic[image_cube] = wsclean_images ######################################### # Shifting solar center to phase center ######################################### logger.info("Shifting images...") shifting_msg, shifted, max_pixel_offset = correct_spectrosnap_phaseshift( wsclean_images_dic, wsclean_models_dic, cellsize, imsize, pol, logger, ) if shifting_msg != 0: logger.warning("Error occured in phase shift correction.") else: logger.info("Image shift is done.") if do_polcal: ################################ # Leakage correction ################################ if pbcor is True or leakagecor is True or pbuncor is True: leakage_info_list = correct_spectrosnap_pbleak( wsclean_images_dic, wsclean_models_dic, metafits, logger=logger, pbcor=pbcor, leakagecor=leakagecor, pbuncor=pbuncor, ncpu=ncpu, ) #################################### # Predict models #################################### if do_polcal or shifted: prediction_failed = False delmod(vis=msname, otf=True, scr=True) wsclean_cmd = "wsclean " + " ".join(wsclean_args) + " -predict " + msname logger.info(f"\nWSClean command: {wsclean_cmd}\n") prediction_msg = run_wsclean(wsclean_cmd, "paircarswsclean", verbose=False) if prediction_msg != 0: prediction_failed = True if do_polcal: ####################################### # Remove chunk files ####################################### images = list(wsclean_images_dic.keys()) models = list(wsclean_models_dic.keys()) residuals = list(wsclean_residuals_dic.keys()) for i in range(len(images)): imagename = images[i] modelname = models[i] residualname = residuals[i] wsclean_images = wsclean_images_dic[imagename] wsclean_models = wsclean_models_dic[modelname] wsclean_residuals = wsclean_residuals_dic[residualname] for img in wsclean_images: os.system(f"rm -rf {img}") for mod in wsclean_models: os.system(f"rm -rf {mod}") for res in wsclean_residuals: os.system(f"rm -rf {res}") ##################################### # Analyzing images ##################################### wsclean_files = {} for suffix in ["image", "model", "residual"]: files = glob.glob(prefix + f"*MFS-{suffix}.fits") if not files: files = glob.glob(prefix + f"*{suffix}.fits") wsclean_files[suffix] = files wsclean_images = wsclean_files["image"] wsclean_models = wsclean_files["model"] wsclean_residuals = wsclean_files["residual"] ####################################################################### # Final frequency averaged images for backup or calculating dynamic ranges ####################################################################### if do_polcal: keep_wsclean_images = False else: keep_wsclean_images = True final_image = ( prefix.replace("present", f"{round_number}") + f"_{pol}_image.fits" ) final_model = ( prefix.replace("present", f"{round_number}") + f"_{pol}_model.fits" ) final_residual = ( prefix.replace("present", f"{round_number}") + f"_{pol}_residual.fits" ) if len(wsclean_images) == 0: logger.error("No image is made.") return 1, applycal_gaintable, 0, 0, "", "", "", [], 0 elif len(wsclean_images) == 1: os.system(f"cp -r {wsclean_images[0]} {final_image}") else: final_image = make_timeavg_image( wsclean_images, final_image, keep_wsclean_images=keep_wsclean_images ) if len(wsclean_models) == 1: os.system(f"cp -r {wsclean_models[0]} {final_model}") else: final_model = make_timeavg_image( wsclean_models, final_model, keep_wsclean_images=keep_wsclean_images ) if len(wsclean_residuals) == 1: os.system(f"cp -r {wsclean_residuals[0]} {final_residual}") else: final_residual = make_timeavg_image( wsclean_residuals, final_residual, keep_wsclean_images=keep_wsclean_images, ) os.system("rm -rf *psf.fits") ######################################### # Restoring previous round flags ######################################### with suppress_output(): flags = flagmanager(vis=msname, mode="list") keys = flags.keys() for k in keys: if k == "MS": pass else: version = flags[0]["name"] if "selfcal" in version: try: with suppress_output(): if restore_flag: flagmanager( vis=msname, mode="restore", versionname=version ) flagmanager(vis=msname, mode="delete", versionname=version) except BaseException: pass ##################################### # Calculating dynamic ranges ###################################### model_flux, rms_DR, rms = calc_dyn_range( final_image, final_model, final_residual, fits_mask=fits_mask, ) if model_flux == 0: logger.error("No model flux.\n") return 1, applycal_gaintable, 0, 0, "", "", "", [], 0 ############################ # Flag backup before selfcal ############################ do_flag_backup(msname, flagtype="selfcal") ######################################## # Check if any calibration is requested ######################################## if do_intensity_cal is False and do_polcal is False: logger.info("No calibration is requested. Returing only previous state.") return ( 2, applycal_gaintable, rms_DR, rms, final_image, final_model, final_residual, [], max_pixel_offset, ) ######################################### # If model prediction failed in polcal ######################################### if (do_polcal or shifted) and prediction_failed: logger.error("Error in predicting model.") return ( 2, applycal_gaintable, rms_DR, rms, final_image, final_model, final_residual, [], max_pixel_offset, ) ############################## # Perform intensity selfcal ############################## if do_intensity_cal: if fluxscale_mwa: solnorm = True ########################## # Perform gain calibration ########################## gain_caltable = prefix.replace("present", f"{round_number}") + ".gcal" if os.path.exists(gain_caltable): os.system("rm -rf " + gain_caltable) logger.info( f"gaincal(vis='{msname}',caltable='{gain_caltable}',uvrange='{uvrange}',refant='{refant}',solint='{solint}',calmode='{calmode}',minsnr=3,solnorm={solnorm})\n" ) with suppress_output(): gaincal( vis=msname, caltable=gain_caltable, uvrange=uvrange, refant=refant, minsnr=3, calmode=calmode, solint=f"{solint}", solnorm=solnorm, ) if not os.path.exists(gain_caltable): logger.error("No gain solutions are found.\n") return 3, applycal_gaintable, 0, 0, "", "", "", [], max_pixel_offset applycal_gaintable.append(gain_caltable) interp.append("linear") ################################# # Gaincal flagging ################################# ( _, _, _, pre_flag_frac, pre_chan_flag_frac, pre_ant_flag_frac, pre_time_flag_frac, ) = get_cal_flag_info(gain_caltable) do_flag_backup(gain_caltable, flagtype="gainflag") with suppress_output(): flagdata( vis=gain_caltable, mode="rflag", datacolumn="CPARAM", timedevscale=5.0, freqdevscale=5.0, flagbackup=False, ) ( _, _, _, flag_frac, chan_flag_frac, ant_flag_frac, time_flag_frac, ) = get_cal_flag_info(gain_caltable) if ( flag_frac - pre_flag_frac > 0.5 or ant_flag_frac - pre_ant_flag_frac > 0.5 or time_flag_frac - pre_time_flag_frac > 0.5 ): logger.info("Restoring flags of gaincal solutions.") flagmanager( vis=gain_caltable, mode="restore", versionname="gainflag_1", ) else: tb = table() tb.open(gain_caltable) gain = tb.getcol("CPARAM") flag = tb.getcol("FLAG") tb.close() gain[flag] = np.nan tb.open(gain_caltable, nomodify=False) new_gain = tb.getcol("CPARAM") shape = new_gain.shape for i in range(shape[0]): avg = np.nanmedian(np.abs(gain[i, ...])) new_gain[i, ...] = new_gain[i, ...] / avg tb.putcol("CPARAM", new_gain) tb.flush() tb.close() with suppress_output(): flagmanager(vis=gain_caltable, mode="delete", versionname="gainflag_1") if not do_bandpass and fluxscale_mwa: logger.info("Flux scaled gain caltable using MWA reference bandpass.") fluxcal_caltable(gain_caltable, attn=solar_attn) ################################## # Perform bandpass calibration ################################## if calmode == "ap" and do_bandpass: bpass_caltable = prefix.replace("present", f"{round_number}") + ".bcal" if os.path.exists(bpass_caltable): os.system("rm -rf " + bpass_caltable) logger.info( f"bandpass(vis='{msname}',caltable='{bpass_caltable}',uvrange='{uvrange}',refant='{refant}',solint='inf',gaintable=['{gain_caltable}'],interp={interp},minsnr=3,solnorm=True)\n" ) with suppress_output(): bandpass( vis=msname, caltable=bpass_caltable, uvrange=uvrange, refant=refant, minsnr=3, solint="inf", interp=interp, gaintable=[gain_caltable], solnorm=True, ) if not os.path.exists(bpass_caltable): logger.error("No bandpass solutions are found.\n") if fluxscale_mwa: logger.info( "Flux scaled gain caltable using MWA reference bandpass." ) fluxcal_caltable(gain_caltable, attn=solar_attn) else: applycal_gaintable.append(bpass_caltable) interp.append("linear,linear") ############################# # Bandpass flagging ############################# ( _, _, _, pre_flag_frac, pre_chan_flag_frac, pre_ant_flag_frac, pre_time_flag_frac, ) = get_cal_flag_info(bpass_caltable) do_flag_backup(bpass_caltable, flagtype="bpassflag") with suppress_output(): flagdata( vis=bpass_caltable, mode="rflag", datacolumn="CPARAM", timedevscale=5.0, freqdevscale=5.0, flagbackup=False, ) if ( flag_frac - pre_flag_frac > 0.5 or ant_flag_frac - pre_ant_flag_frac > 0.5 or chan_flag_frac - pre_chan_flag_frac > 0.5 ): logger.info("Restoring flags of bandpass solutions.") flagmanager( vis=bpass_caltable, mode="restore", versionname="bpassflag_1", ) else: tb = table() tb.open(bpass_caltable) gain = tb.getcol("CPARAM") flag = tb.getcol("FLAG") tb.close() gain[flag] = np.nan tb.open(bpass_caltable, nomodify=False) new_gain = tb.getcol("CPARAM") shape = new_gain.shape for i in range(shape[0]): avg = np.nanmedian(np.abs(gain[i, ...])) new_gain[i, ...] = new_gain[i, ...] / avg tb.putcol("CPARAM", new_gain) tb.flush() tb.close() with suppress_output(): flagmanager( vis=bpass_caltable, mode="delete", versionname="bpassflag_1" ) if fluxscale_mwa: logger.info( "Flux scaled bandpass caltable using MWA reference bandpass." ) fluxcal_caltable(bpass_caltable, attn=solar_attn) logger.info( f"applycal(vis='{msname}',gaintable={applycal_gaintable},interp={interp},applymode='{applymode}',calwt=[False],flagbackup=False)\n" ) with suppress_output(): applycal( vis=msname, gaintable=applycal_gaintable, interp=interp, applymode=applymode, calwt=[False], flagbackup=False, ) ################################################### # Perform polarisation calibration using quartical ################################################### if do_polcal: pol_caltable = prefix.replace("present", f"{round_number}") + ".dcal" quartical_log = prefix.replace("present", f"{round_number}") + ".qclog" if os.path.exists(pol_caltable): os.system(f"rm -rf {pol_caltable}") minuv, maxuv = uvrange_casa_to_quartical(msname, uvrange) quartical_args = [ "goquartical", f"input_ms.path={msname}", "input_ms.data_column=DATA", f"input_ms.select_uv_range=[{minuv},{maxuv}]", "input_model.recipe=MODEL_DATA", f"output.gain_directory={pol_caltable}", f"solver.reference_antenna={refant}", "output.overwrite=True", "output.log_to_terminal=True", f"output.log_directory={quartical_log}", "solver.terms=[D]", "solver.iter_recipe=[200]", "solver.propagate_flags=True", f"solver.threads={ncpu}", "dask.threads=1", "D.type=complex", ] if solint == "inf": quartical_args.append("D.time_interval=1") elif solint != "int": quartical_args.append(f"D.time_interval={solint}") else: quartical_args.append(f"D.time_interval={max_ntime}") if do_bandpass: quartical_args.append(f"D.freq_interval={int(freqres*1000.0)}kHz") else: quartical_args.append("D.freq_interval=1") if solve_array_leakage: quartical_args.append("D.solve_per=array") quartical_cmd = " ".join(quartical_args) logger.info(f"\nQuartical cmd: {quartical_cmd}\n") quartical_msg = run_quartical( quartical_cmd, "paircarsquartical", verbose=False ) os.system(f"rm -rf {quartical_log}") if quartical_msg != 0 or os.path.exists(pol_caltable) is False: logger.error("Quartical calibration is not successful.\n") return 3, [], 0, 0, "", "", "", [], max_pixel_offset applycal_gaintable.append(pol_caltable) ###################################### # Flagging quartical table ###################################### pol_caltable = flag_quartical_table(pol_caltable) ###################################### # Caltable normalisation ###################################### if pol_solnorm: pol_caltable = quartical_matrix_normalize(pol_caltable, overwrite=True) ###################################### # Applying quartical solutions ###################################### temp_pol_caltable = ( prefix.replace("present", f"{round_number}") + "_temp.dcal" ) quartical_args = [ "goquartical", f"input_ms.path={msname}", "input_ms.data_column=DATA", "output.log_to_terminal=True", f"output.log_directory={quartical_log}", f"output.gain_directory={temp_pol_caltable}", "output.overwrite=True", "output.products=[corrected_data]", "output.columns=[CORRECTED_DATA]", "output.flags=True", "solver.terms=[D]", "solver.iter_recipe=[0]", "solver.propagate_flags=True", f"solver.threads={ncpu}", "dask.threads=1", "D.type=complex", f"D.load_from={pol_caltable}/D", ] quartical_cmd = " ".join(quartical_args) logger.info(f"\nQuartical cmd: {quartical_cmd}\n") quartical_msg = run_quartical( quartical_cmd, "paircarsquartical", verbose=False ) os.system(f"rm -rf {quartical_log} {temp_pol_caltable}") if quartical_msg != 0: logger.error( "Quartical calibration applying solutions is not successful.\n" ) return 3, [], 0, 0, "", "", "", [], max_pixel_offset ##################################### # Flag zeros ##################################### with suppress_output(): flagdata( vis=msname, mode="clip", clipzeros=True, datacolumn="corrected", flagbackup=False, ) ###################################### # UVsub flagging ###################################### try: if do_flag: logger.info("Flagging in uv-domain data.\n") do_uvsub_flag(msname, threshold_list=[10, 7, 5], ncpu=max(1, ncpu)) except Exception: logger.exception(traceback.print_exc()) return ( 0, applycal_gaintable, rms_DR, rms, final_image, final_model, final_residual, leakage_info_list, max_pixel_offset, ) except Exception: logger.exception(traceback.print_exc()) return 4, applycal_gaintable, 0, 0, "", "", "", [], 0 finally: os.chdir(cwd)