Source code for paircars.pipeline.do_apply_selfcal

import logging
import numpy as np
import argparse
import time
import glob
import sys
import os
from casatools import msmetadata
from dask import delayed
from astropy.io import fits
from paircars.utils.logger_utils import (
    SmartDefaultsHelpFormatter,
    clean_shutdown,
    init_logger,
    get_logger_safe,
)
from paircars.utils.basic_utils import print_banner
from paircars.utils.ms_metadata import check_datacolumn_valid
from paircars.utils.mwa_utils import freq_to_MWA_coarse, get_ncoarse
from paircars.utils.proc_manage_utils import (
    scale_worker_and_wait,
    get_local_dask_cluster,
)
from paircars.utils.resource_utils import drop_cache
from paircars.pipeline.do_apply_basiccal import applysol_wrapper

logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("tornado.application").setLevel(logging.CRITICAL)


[docs] def run_all_applysol( mslist, metafits, dask_client, workdir, caldir, overwrite_datacolumn=False, applymode="calonly", force_apply=False, n_threads=1, mem_limit=1, logger=None, ): """ Apply self-calibrator solutions on all target scans Parameters ---------- mslist : list Measurement set list metafits: str Metafits file dask_client : dask.client Dask client workdir : str Working directory caldir : str Calibration directory overwrite_datacolumn : bool, optional Overwrite data column or not applymode : str, optional Apply mode force_apply : bool, optional Force to apply solutions even already applied n_threads : int, optional CPU threads to use mem_limit : float, optional Memory to use in GB Returns -------- int Succeeded gain solution ms number int Failed gain solution ms number int Succeeded polarisation solution ms number int Failed polarisation solution ms number """ if logger is None: logger = get_logger_safe() if len(mslist) == 0: logger.critical("Please provide a valid measurement set list.") return 0, 0, 0, 0 else: gain_succeed = 0 gain_failed = len(mslist) pol_succeed = 0 pol_failed = len(mslist) try: os.chdir(workdir) logger.debug(f"Current working directory: {os.getcwd()}") mslist = np.unique(mslist).tolist() header = fits.getheader(metafits) obsid = header["GPSTIME"] selfcal_tables = sorted(glob.glob(f"{caldir}/selfcal_{obsid}_ch_*.gcal")) if len(selfcal_tables) == 0: logger.error(f"No self-cal caltable is present in {caldir}.") return gain_succeed, gain_failed, pol_succeed, pol_failed logger.debug(f"Self-calibration tables: {selfcal_tables}") selfcal_tables_start_chans = np.array( [ int( os.path.basename(i) .split(".gcal")[0] .split("_ch_")[-1] .split("-")[0] ) for i in selfcal_tables ] ) logger.debug(f"Start coarse channels: {selfcal_tables_start_chans}") selfcal_tables_end_chans = np.array( [ int( os.path.basename(i) .split(".gcal")[0] .split("_ch_")[-1] .split("-")[-1] ) for i in selfcal_tables ] ) logger.debug(f"End coarse channels: {selfcal_tables_end_chans}") #################################### # Filtering any corrupted ms ##################################### filtered_mslist = [] # Filtering in case any ms is corrupted for ms in mslist: checkcol = check_datacolumn_valid(ms) if checkcol: filtered_mslist.append(ms) else: logger.warning(f"Issue in : {ms}") mslist = filtered_mslist if len(mslist) == 0: logger.error("No valid measurement set.") return gain_succeed, gain_failed, pol_succeed, pol_failed #################################### # Applycal jobs #################################### logger.info(f"Total ms list: {len(mslist)}") tasks = [] msmd = msmetadata() for ms in mslist: msmd.open(ms) freqs = msmd.chanfreqs(0, unit="MHz") start_freq = np.nanmin(freqs) end_freq = np.nanmax(freqs) start_coarse_chan = freq_to_MWA_coarse(start_freq) end_coarse_chan = freq_to_MWA_coarse(end_freq) msmd.close() gaintable = [] quartical_table = [] for i in range(len(selfcal_tables_start_chans)): s = selfcal_tables_start_chans[i] e = selfcal_tables_end_chans[i] if start_coarse_chan >= s and end_coarse_chan <= e: gaintable = selfcal_tables[i] gaintable_prefix = gaintable.split(".gcal")[0] gaintable = [] quartical_table = [] if os.path.exists(f"{gaintable_prefix}.gcal"): gaintable.append(f"{gaintable_prefix}.gcal") if os.path.exists(f"{gaintable_prefix}.bcal"): gaintable.append(f"{gaintable_prefix}.bcal") if os.path.exists(f"{gaintable_prefix}.dcal"): quartical_table.append(f"{gaintable_prefix}.dcal") if len(gaintable) == 0: logger.error( f"Measurement set coarse channel : {start_coarse_chan} to {end_coarse_chan}. Corresponding self-calibration table is not present." ) else: if len(quartical_table) == 0: logger.warning( f"Measurement set coarse channel : {start_coarse_chan} to {end_coarse_chan}. Corresponding polarisation self-calibration table is not present." ) os.system(f"touch {ms}/.nopolselfcal") tasks.append( delayed(applysol_wrapper)( ms, workdir, gaintable=gaintable, quartical_table=quartical_table, overwrite_datacolumn=overwrite_datacolumn, applymode=applymode, interp=["linear,linear"], n_threads=n_threads, mem_limit=mem_limit, force_apply=force_apply, soltype="selfcal", ) ) if len(tasks) > 0: result_wrapper = list(dask_client.gather(dask_client.compute(tasks))) results = [] for r in result_wrapper: results.append(r[1]) logger.debug("================") logger.debug(f"Worker log for: {os.path.basename(r[0])}") logger.debug("================") for line in r[2].splitlines(): logger.debug(line) for line in r[3].splitlines(): logger.debug(line) gain_msg = [] pol_msg = [] for r in results: gain_msg.append(r[0]) pol_msg.append(r[1]) gain_failed = sum(gain_msg) pol_failed = sum(pol_msg) gain_succeed = len(mslist) - gain_failed pol_succeed = len(mslist) - pol_failed logger.info(f"Total measurement sets: {len(mslist)}") logger.info(f"Gain solution applied, Succeeded: {gain_succeed}") logger.info(f"Gain solution applied, Failed: {gain_failed}") logger.info(f"Polarisation solution applied, Succeeded: {pol_succeed}") logger.info(f"Polarisation solution applied, Failed: {pol_failed}") if gain_failed == 0 and pol_failed == 0: logger.info( "Applying gain and polarisation self-calibration solutions for targets are done successfully." ) elif pol_failed == 0: logger.warning( "Applying gain self-calibration solutions for targets are done successfully, but failed for polarisation solutions." ) else: logger.error( "Applying self-calibration solutions for targets are not done successfully." ) else: logger.error( "Applying self-calibration solutions for targets are not done successfully. No suitable calibration solutions are found." ) except Exception: logger.exception( "Applying self-calibration solutions for targets are not done successfully.", exc_info=True, ) finally: return gain_succeed, gain_failed, pol_succeed, pol_failed
[docs] def main( mslist, metafits, workdir, caldir, applymode="calonly", overwrite_datacolumn=False, force_apply=False, start_remote_log=False, cpu_frac=0.8, mem_frac=0.8, logfile=None, jobid=0, verbose=False, dask_client=None, ): """ Apply calibration solutions to a list of measurement sets. Parameters ---------- mslist : str Comma-separated list of measurement set paths to apply calibration to. metafits : str Metafits file workdir : str Directory for logs, intermediate files, and PID tracking. caldir : str Directory containing calibration tables (e.g., gain, bandpass, polarization). applymode : str, optional Mode for applying calibration (e.g., "calonly", "calflag", "flagonly"). Default is "calonly". overwrite_datacolumn : bool, optional If True, overwrites the existing corrected data column. Default is False. force_apply : bool, optional If True, applies calibration even if it appears to have been applied already. Default is False. start_remote_log : bool, optional Whether to enable remote logging using credentials found in `workdir`. Default is False. cpu_frac : float, optional Fraction of available CPU resources to allocate per task. Default is 0.8. mem_frac : float, optional Fraction of system memory to allocate per task. Default is 0.8. logfile : str or None, optional Path to the logfile for saving logs. If None, logging to file is disabled. Default is None. jobid : int, optional Job ID for PID tracking and logging. Default is 0. verbose : bool, optional Verbose logs dask_client : dask.client, optional Dask client address Returns -------- int Succeeded gain solution ms number int Failed gain solution ms number int Succeeded polarisation solution ms number int Failed polarisation solution ms number """ logger = get_logger_safe() if verbose: logger.setLevel(logging.DEBUG) cpu_frac = min(0.8, abs(cpu_frac)) mem_frac = min(0.8, abs(mem_frac)) # Get first MS from mslist for fallback directory creation mslist = mslist.split(",") if workdir == "": workdir = os.path.dirname(os.path.abspath(mslist[0])) + "/workdir" os.makedirs(workdir, exist_ok=True) os.chdir(workdir) ############ # Logger ############ observer = None if ( start_remote_log and os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None ): time.sleep(5) jobname, password = np.load( f"{workdir}/.jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): observer = init_logger( "apply_selfcal", logfile, jobname=jobname, password=password ) if observer is None: logger.info("Not transmiting to remote logger.") if len(mslist) == 0: logger.crititcal("Please provide a valid measurement set list.") return 0, 0, 0, 0 else: gain_succeed = 0 gain_failed = len(mslist) pol_succeed = 0 pol_failed = len(mslist) total_ncoarse = 0 for msname in mslist: ncoarse = get_ncoarse(msname) total_ncoarse += ncoarse total_ncoarse = max(1, total_ncoarse) logger.debug(f"Total coarse channels: {total_ncoarse}.") dask_cluster = None if dask_client is None: dask_client, dask_cluster, dask_dir, nworker = get_local_dask_cluster( workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, max_worker=len(mslist) + 1, ) if dask_client is None: logger.critical("Error occured in creating local cluster.") return 0, 0, 0, 0 scale_worker_and_wait(dask_cluster, dask_client, nworker) try: for banner in print_banner( "Starting applying solutions.", no_print=True ).splitlines(): logger.info(banner) client_info = dask_client.scheduler_info()["workers"] njobs = len(client_info) worker_mem_list = [] for addr, w in client_info.items(): worker_mem_list.append(w["memory_limit"] / 1024**3) mem_limit = round(min(worker_mem_list), 3) n_threads = os.environ.get("OMP_NUM_THREADS") if n_threads is not None: n_threads = int(n_threads) else: n_threads = 1 logger.info("#################################") logger.info(f"Total dask worker: {njobs}") logger.info(f"CPU per worker: {n_threads}") logger.info(f"Memory per worker: {mem_limit} GB") logger.info("#################################") if caldir == "" or not os.path.exists(caldir): logger.error("Provide existing caltable directory.") else: gain_succeed, gain_failed, pol_succeed, pol_failed = run_all_applysol( mslist, metafits, dask_client, workdir, caldir, overwrite_datacolumn=overwrite_datacolumn, applymode=applymode, force_apply=force_apply, n_threads=n_threads, mem_limit=mem_limit, logger=logger, ) except Exception: logger.exception( "Exception occured in applying self-calibration solutions.", exc_info=True ) finally: time.sleep(5) clean_shutdown(observer) for ms in mslist: drop_cache(ms) if dask_cluster is not None: dask_client.shutdown() dask_client.close() dask_cluster.close() drop_cache(workdir) os.system(f"rm -rf {dask_dir}") return gain_succeed, gain_failed, pol_succeed, pol_failed
[docs] def cli(): parser = argparse.ArgumentParser( description="Apply self-calibration solutions to target scans", formatter_class=SmartDefaultsHelpFormatter, ) # Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument( "mslist", type=str, help="Comma-separated list of measurement sets (required)", ) basic_args.add_argument( "metafits", type=str, help="Metafits file (required)", ) basic_args.add_argument( "--workdir", type=str, default="", required=True, help="Working directory for intermediate files", ) basic_args.add_argument( "--caldir", type=str, default="", required=True, help="Directory containing self-calibration tables", ) # Advanced parameters adv_args = parser.add_argument_group( "###################\nAdvanced parameters\n###################" ) adv_args.add_argument( "--applymode", type=str, default="calonly", help="Applycal mode (e.g. 'calonly', 'calflag')", ) adv_args.add_argument( "--overwrite_datacolumn", action="store_true", help="Overwrite corrected data column in MS", ) adv_args.add_argument( "--force_apply", action="store_true", help="Force apply calibration even if already applied", ) adv_args.add_argument("--verbose", action="store_true", help="Verbose logs") adv_args.add_argument( "--jobid", type=str, default="0", help="Job ID for logging and PID tracking" ) # Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="CPU fraction to use" ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Memory fraction to use" ) if len(sys.argv) == 1: parser.print_help(sys.stderr) return 1 args = parser.parse_args() gain_succeed, gain_failed, pol_succeed, pol_failed = main( args.mslist, args.metafits, args.workdir, args.caldir, applymode=args.applymode, overwrite_datacolumn=args.overwrite_datacolumn, force_apply=args.force_apply, cpu_frac=float(args.cpu_frac), mem_frac=float(args.mem_frac), verbose=args.verbose, jobid=args.jobid, ) if gain_failed == 0 and pol_failed == 0: msg = 0 else: msg = 1 return msg