Source code for paircars.pipeline.run_paircars

import os
import sys
import traceback
import argparse
from paircars.utils.basic_utils import (
    check_port_status,
    get_free_port,
    check_permission,
)
from paircars.utils.logger_utils import SmartDefaultsHelpFormatter
from paircars.utils.proc_manage_utils import (
    get_scheduler_name,
    submit_local_master_flow,
    get_jobid,
)
from paircars.utils.prefect_setup_utils import (
    prefect_server_status,
    start_prefect_server,
    stop_prefect_server,
)
from paircars.clusterutils.slurm_cluster import submit_slurm_master_flow


[docs] def cli(): parser = argparse.ArgumentParser( description="Run P-AIRCARS for calibration and imaging of solar observations.", formatter_class=SmartDefaultsHelpFormatter, ) # === Essential parameters === essential = parser.add_argument_group( "###################\nEssential parameters\n###################" ) essential.add_argument( "target_datadir", type=str, help="Target measurement set directory" ) essential.add_argument( "--workdir", type=str, dest="workdir", required=True, help="Working directory", ) essential.add_argument( "--outdir", type=str, dest="outdir", required=True, help="Output products directory", ) essential.add_argument( "--target_metafits", type=str, default="", dest="target_metafits", help="Target metafits file", ) essential.add_argument( "--cal_datadir", type=str, default="", dest="cal_datadir", help="Calibrator measurement set directory", ) essential.add_argument( "--cal_metafits", type=str, default="", dest="cal_metafits", help="Calibrator metafits file", ) # === Advanced calibration parameters === advanced_cal = parser.add_argument_group( "###################\nAdvanced calibration parameters\n###################" ) advanced_cal.add_argument( "--solint", type=str, default="60s", help="Solution interval for calibration (e.g. 'int', '10s', '5min', 'inf')", ) advanced_cal.add_argument( "--cal_uvrange", type=str, default="", help="UV range to filter data for calibration (e.g. '>100klambda', '100~10000lambda')", ) advanced_cal.add_argument( "--no_polcal", action="store_false", dest="do_polcal", help="Disable polarization calibration", ) advanced_cal.add_argument( "--only_amplitude", action="store_true", help="Apply only amplitude part of gain solution from calibrator or not", ) advanced_cal.add_argument( "--redo_basic_cal", action="store_true", help="Redo basic calibration or not", ) advanced_cal.add_argument( "--redo_selfcal", action="store_true", help="Redo self-calibration or not", ) advanced_cal.add_argument( "--use_solarflagger", action="store_true", help="Use solar flagger", ) # === Advanced imaging parameters === advanced_image = parser.add_argument_group( "###################\nAdvanced imaging parameters\n###################" ) advanced_image.add_argument( "--freqrange", type=str, default="", help="Frequency range in MHz to select during imaging (comma-separated, e.g. '100~110,130~140')", ) advanced_image.add_argument( "--timerange", type=str, default="", help="Time range to select during imaging (comma-separated, e.g. '2014/09/06/09:30:00~2014/09/06/09:45:00,2014/09/06/10:30:00~2014/09/06/10:45:00')", ) advanced_image.add_argument( "--image_freqres", type=float, default=1.28, help="Output image frequency resolution in MHz (-1 = full)", ) advanced_image.add_argument( "--image_timeres", type=float, default=10.0, help="Output image time resolution in seconds (-1 = full)", ) advanced_image.add_argument( "--pol", type=str, default="IQUV", help="Stokes parameter(s) to image ('I' or 'IQUV')", ) advanced_image.add_argument( "--minuv", type=float, default=0, help="Minimum baseline length (in wavelengths) to include in imaging", ) advanced_image.add_argument( "--weight", type=str, default="briggs", help="Imaging weighting scheme (e.g. 'briggs', 'natural', 'uniform')", ) advanced_image.add_argument( "--robust", type=float, default=0.0, help="Robust parameter for Briggs weighting (-2 to +2)", ) advanced_image.add_argument( "--no_multiscale", action="store_false", dest="use_multiscale", help="Disable multiscale CLEAN for extended structures", ) advanced_image.add_argument( "--clean_threshold", type=float, default=1.0, help="Clean threshold in sigma for final deconvolution", ) advanced_image.add_argument( "--no_pbcor", action="store_false", dest="do_pbcor", help="Do not apply primary beam correction after imaging", ) advanced_image.add_argument( "--cutout_rsun", type=float, default=10.0, help="Field of view cutout radius in solar radii", ) advanced_image.add_argument( "--no_solar_mask", action="store_false", dest="use_solar_mask", help="Disable use solar disk mask during deconvolution", ) advanced_image.add_argument( "--do_overlay", action="store_true", dest="make_overlay", help="Make overlay plot on EUV images", ) # === Advanced options === advanced = parser.add_argument_group( "###################\nAdvanced pipeline parameters\n###################" ) advanced.add_argument( "--make_msplot", action="store_true", help="Make diagnostic plots of measurement sets", ) advanced.add_argument( "--non_solar_data", action="store_false", dest="solar_data", help="Disable solar data mode", ) advanced.add_argument( "--no_ds", action="store_false", dest="make_ds", help="Disable making solar dynamic spectra", ) advanced.add_argument( "--do_forcereset_weightflag", action="store_true", help="Force reset of weights and flags (disabled by default)", ) advanced.add_argument( "--no_cal_flag", action="store_false", dest="do_cal_flag", help="Disable initial flagging of calibrators", ) advanced.add_argument( "--no_import_model", action="store_false", dest="do_import_model", help="Disable model import", ) advanced.add_argument( "--no_basic_cal", action="store_false", dest="do_basic_cal", help="Disable basic gain calibration", ) advanced.add_argument( "--do_sidereal_cor", action="store_true", dest="do_sidereal_cor", help="Sidereal motion correction for Sun (disabled by default)", ) advanced.add_argument( "--no_solarcenter_move", action="store_false", dest="do_move_solarcenter", help="Disable moving phaseceneter to solar center", ) advanced.add_argument( "--no_selfcal", action="store_false", dest="do_selfcal", help="Disable self-calibration", ) advanced.add_argument( "--no_ap_selfcal", action="store_false", dest="do_ap_selfcal", help="Disable amplitude-phase self-calibration", ) advanced.add_argument( "--no_solar_selfcal", action="store_false", dest="solar_selfcal", help="Disable solar-specific self-calibration parameters", ) advanced.add_argument( "--no_applycal", action="store_false", dest="do_applycal", help="Disable application of basic calibration solutions", ) advanced.add_argument( "--no_apply_selfcal", action="store_false", dest="do_apply_selfcal", help="Disable application of self-calibration solutions", ) advanced.add_argument( "--no_imaging", action="store_false", dest="do_imaging", help="Disable final imaging", ) advanced.add_argument( "--verbose", action="store_true", help="Verbose logs", ) # === Advanced local system/ per node hardware resource parameters === advanced_resource = parser.add_argument_group( "###################\nAdvanced hardware resource parameters for local system or per node on HPC cluster\n###################" ) advanced_resource.add_argument( "--cpu_frac", type=float, default=0.8, help="Fraction of CPU usuage per node", ) advanced_resource.add_argument( "--mem_frac", type=float, default=0.8, help="Fraction of memory usuage per node", ) advanced_resource.add_argument( "--max_worker", type=int, default=-1, help="Maximum number of workers", ) advanced_resource.add_argument( "--keep_backup", action="store_true", help="Keep backup of intermediate steps", ) advanced_resource.add_argument( "--no_calibrated_ms", action="store_false", dest="keep_calibrated_ms", help="Keep calibrated measurement sets or not", ) advanced_resource.add_argument( "--no_remote_logger", action="store_false", dest="remote_logger", help="Disable remote logger", ) advanced_resource.add_argument( "--log2term", action="store_true", help="Show logs in terminal", ) advanced_resource.add_argument( "--job_password", type=str, default=None, help="User specified job password", ) advanced_resource.add_argument( "--cluster", action="store_true", dest="cluster", help="Running in cluster environment", ) advanced_resource.add_argument( "--port", type=int, default=4260, help="Prefect port", ) # === Advanced job scheduler parameters === advanced_slurm = parser.add_argument_group( "###################\nAdvanced slurm cluster settings\n###################" ) advanced_slurm.add_argument( "--partition", type=str, default=None, help="Partition name (Required)", ) advanced_slurm.add_argument( "--account", type=str, default=None, help="Account name (If your cluster requires this, you should provide. Otherwise job can not be started)", ) advanced_slurm.add_argument( "--walltime", type=str, default=None, help="Wall time, each slurm job can execute in maximum this time", ) if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) args = parser.parse_args() if args.target_datadir.startswith("~"): print("Please provide full path of target directory.") return 1 if args.workdir.startswith("~"): print("Please provide full path of work directory.") return 1 if args.outdir.startswith("~"): print("Please provide full path of output directory.") return 1 target_datadir_permission = check_permission(args.target_datadir) if target_datadir_permission is False: print( f"Do not have permission for target data directory: {args.target_datadir}" ) return cal_datadir_list = args.cal_datadir.split(",") filtered_cal_datadir_list = [] for cal_datadir in cal_datadir_list: cal_datadir_permission = check_permission(cal_datadir) if cal_datadir_permission is False: print( f"Do not have permission for calibrator data directory: {cal_datadir}" ) else: filtered_cal_datadir_list.append(cal_datadir) cal_datadir_list = filtered_cal_datadir_list if len(cal_datadir_list) > 0: args.cal_datadir = ",".join(cal_datadir_list) else: args.cal_datadir = "" jobid = get_jobid() scheduler_name = get_scheduler_name() prefect_status = prefect_server_status(scheduler_name=scheduler_name) if prefect_status is False: print("Prefect server is not running.") print("Prefect setup is initiating ....") port = args.port postgres_port = port + 1000 if check_port_status(port) is False: if scheduler_name != "local": port = get_free_port(start_port=port, end_port=port + 990) if check_port_status(postgres_port) is False: if scheduler_name != "local": postgres_port = get_free_port( start_port=postgres_port, end_port=postgres_port + 990 ) msg, config_file, profile_path, env_file, dashboard, pid_file = ( start_prefect_server(port, postgres_port, scheduler_name=scheduler_name) ) if msg != 0: if scheduler_name != "local": print( f"P-AIRCARS will not work in prefect ephemeral mode in cluster environment with job scheduler: {scheduler_name}" ) return 1 else: print( "Error in starting prefect server at port. P-AIRCARS will use ephemeral mode in local cluster." ) try: stop_prefect_server(scheduler_name=scheduler_name) except Exception: pass os.system(f"rm -rf {config_file} {profile_path} {env_file} {pid_file}") try: ############################################ # Submitting batch script ############################################ if not args.cluster or scheduler_name == "local": msg = submit_local_master_flow(args, jobid) if msg != 0: print( "Some error may occured in batch script execution, while P-AIRCARS may be successully completed." ) return msg elif scheduler_name == "slurm": msg = submit_slurm_master_flow(args, jobid) if msg != 0: print( "Some error may occured in batch script execution, while P-AIRCARS may be successully completed." ) return msg else: print( f"P-AIRCARS currently does not support {scheduler_name} job scheduler." ) return 1 except Exception: print("Error occured in executing P-AIRCARS master flow.") traceback.print_exc() return 1
if __name__ == "__main__": cli()