Source code for paircars.pipeline.move_solarcenter

import logging
import numpy as np
import argparse
import time
import sys
import os
from dask import delayed
from paircars.utils.basic_utils import (
    print_banner,
    capture_all_output,
)
from paircars.utils.logger_utils import (
    SmartDefaultsHelpFormatter,
    clean_shutdown,
    init_logger,
    get_logger_safe,
)
from paircars.utils.mwa_utils import get_ncoarse
from paircars.utils.proc_manage_utils import (
    scale_worker_and_wait,
    get_local_dask_cluster,
)
from paircars.utils.resource_utils import drop_cache
from paircars.utils.sunpos_utils import move_to_sun
from paircars.utils.udocker_utils import (
    check_udocker_container,
    initialize_wsclean_container,
)

logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("tornado.application").setLevel(logging.CRITICAL)


[docs] def move_to_sun_wrapper(*args, **kwargs): with capture_all_output() as (out, err): result = move_to_sun(*args, **kwargs) return args[0], result, out.getvalue(), err.getvalue()
[docs] def main( mslist, workdir="", cpu_frac=0.8, mem_frac=0.8, logfile=None, jobid=0, verbose=False, start_remote_log=False, dask_client=None, ): """ Run the flagging pipeline for a measurement set. Parameters ---------- mslist : str List of measurement sets workdir : str, optional Working directory cpu_frac : float, optional Fraction of total CPU resources to use. Default is 0.8. mem_frac : float, optional Fraction of total memory to use. Default is 0.8. logfile : str or None, optional Path to the log file for saving logs. If None, logging to file is skipped. jobid : int, optional Numeric job ID used for PID tracking. Default is 0. verbose : bool, optional Verbose logs start_remote_log : bool, optional Whether to enable remote logging using credentials in the workdir. Default is False. dask_client : dask.client, optional Dask client Returns ------- int Success message int Succeeded ms number int Failed ms number """ logger = get_logger_safe() if verbose: logger.setLevel(logging.DEBUG) cpu_frac = min(0.8, abs(cpu_frac)) mem_frac = min(0.8, abs(mem_frac)) mslist = mslist.split(",") if workdir == "": workdir = os.path.dirname(os.path.abspath(mslist[0])) + "/workdir" os.makedirs(workdir, exist_ok=True) os.chdir(workdir) logger.debug(f"Current working directory: {os.getcwd()}") ############ # Logger ############ observer = None if ( start_remote_log and os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None ): time.sleep(1) jobname, password = np.load( f"{workdir}/.jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): observer = init_logger( "do_flagging", logfile, jobname=jobname, password=password ) if observer is None: logger.info("Not transmiting to remote logger.") if len(mslist) == 0: logger.critical("Please provide a valid measurement set list.") return 1, 0, 0 else: succeed = 0 failed = len(mslist) ########################### # WSClean container ########################### container_name = "paircarswsclean" container_present = check_udocker_container(container_name) if not container_present: logger.debug(f"Initializing {container_name}.") container_name = initialize_wsclean_container(name=container_name, verbose=True) if container_name is None: logger.critical( f"Container {container_name} is not initiated. First initiate container and then run." ) return 1, succeed, failed total_ncoarse = 0 for msname in mslist: ncoarse = get_ncoarse(msname) total_ncoarse += ncoarse total_ncoarse = max(1, total_ncoarse) logger.debug(f"Total coarse channels: {total_ncoarse}") dask_cluster = None if dask_client is None: dask_client, dask_cluster, dask_dir, nworker = get_local_dask_cluster( workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, max_worker=len(mslist) + 1, ) if dask_client is None: logger.critical("Error occured in creating local cluster.") return 1, succeed, failed scale_worker_and_wait(dask_cluster, dask_client, nworker) try: for banner in print_banner( "Starting to move phasecenter to solarcenters.", no_print=True ).splitlines(): logger.info(banner) client_info = dask_client.scheduler_info()["workers"] njobs = len(client_info) worker_mem_list = [] for addr, w in client_info.items(): worker_mem_list.append(w["memory_limit"] / 1024**3) mem_limit = round(min(worker_mem_list), 3) n_threads = os.environ.get("OMP_NUM_THREADS") if n_threads is not None: n_threads = int(n_threads) else: n_threads = 1 logger.info("#################################") logger.info(f"Total dask worker: {njobs}") logger.info(f"CPU per worker: {n_threads}") logger.info(f"Memory per worker: {mem_limit} GB") logger.info("#################################") tasks = [ delayed(move_to_sun_wrapper)(msname, ncpu=n_threads) for msname in mslist ] result_wrapper = list(dask_client.gather(dask_client.compute(tasks))) results = [] for r in result_wrapper: results.append(r[1]) logger.debug("================") logger.debug(f"Worker log for: {os.path.basename(r[0])}") logger.debug("================") for line in r[2].splitlines(): logger.debug(line) for line in r[3].splitlines(): logger.debug(line) failed = sum(results) succeed = len(mslist) - failed logger.info(f"Total measurement sets: {len(mslist)}") logger.info(f"Total success: {succeed}") logger.info(f"Total failure: {failed}") if len(mslist) == failed: msg = 1 else: msg = 0 except Exception: logger.exception( "Exception occured in moving phasecenter to solar center.", exc_info=True ) msg = 1 finally: time.sleep(5) clean_shutdown(observer) for ms in mslist: drop_cache(ms) if dask_cluster is not None: dask_client.shutdown() dask_client.close() dask_cluster.close() drop_cache(workdir) os.system(f"rm -rf {dask_dir}") return msg, succeed, failed
[docs] def cli(): usage = "Move phasecenter of the measurement set to the Sun" parser = argparse.ArgumentParser( description=usage, formatter_class=SmartDefaultsHelpFormatter ) # Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument( "mslist", type=str, help="Measurement set list, comma separated" ) basic_args.add_argument( "--workdir", type=str, default="", help="Name of work directory" ) # Advanced switches adv_args = parser.add_argument_group( "###################\nAdvanced parameters\n###################" ) adv_args.add_argument("--verbose", action="store_true", help="Verbose logs") adv_args.add_argument("--jobid", type=int, default=0, help="Job ID") # Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="CPU fraction to use" ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Memory fraction to use" ) if len(sys.argv) == 1: parser.print_help(sys.stderr) return 1 args = parser.parse_args() msg, _, _ = main( args.mslist, workdir=args.workdir, cpu_frac=args.cpu_frac, mem_frac=args.mem_frac, jobid=args.jobid, verbose=args.verbose, ) return msg