Source code for paircars.pipeline.mwa_make_ds

import logging
import numpy as np
import argparse
import warnings
import time
import glob
import sys
import os
from dask import delayed
from paircars.utils.basic_utils import (
    print_banner,
    capture_all_output,
)
from paircars.utils.ds_utils import calc_dynamic_spectrum
from paircars.utils.logger_utils import (
    SmartDefaultsHelpFormatter,
    clean_shutdown,
    init_logger,
    get_logger_safe,
)
from paircars.utils.mwa_ploting_utils import make_ds_plot
from paircars.utils.mwa_utils import get_MWA_OBSID, get_ncoarse
from paircars.utils.proc_manage_utils import (
    scale_worker_and_wait,
    get_local_dask_cluster,
)
from paircars.utils.resource_utils import drop_cache

logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("tornado.application").setLevel(logging.CRITICAL)


[docs] def calc_dynamic_spectrum_wrapper(*args, **kwargs): with capture_all_output() as (out, err): result = calc_dynamic_spectrum(*args, **kwargs) return args[0], result, out.getvalue(), err.getvalue()
[docs] def make_solar_DS( mslist, dask_client, metafits, workdir, outdir, plot_quantity="TB", extension="png", showgui=False, overwrite=False, n_threads=1, mem_limit=1, njobs=1, logger=None, ): """ Make solar dynamic spectrum and plots Parameters ---------- mslist : list Measurement set list (Provide only same obsid measurement set list) dask_client: dask.client Dask client metafits : str Metafits file workdir : str Work directory outdir : str Output directory plot_quantity : str, optional Plotting quantity (TB or flux) extension : str, optional Image file extension showgui : bool, optional Show GUI overwrite : bool, optional Overwrite plot n_threads : int, optional CPU threads to use mem_limit : float, optional Memory to use in GB njobs : int, optional Number of parallel jobs Returns ------- int Success message str Plot file name int Succeeded ms number int Failed ms number """ if logger is None: logger = get_logger_safe() warnings.filterwarnings("ignore", category=RuntimeWarning) os.makedirs(f"{outdir}/dynamic_spectra", exist_ok=True) if len(mslist) == 0: logger.critical("Please provide a valid measurement set list.") return 1, 0, 0 else: succeed = 0 failed = len(mslist) obsid = get_MWA_OBSID(mslist[0]) ds_file_name = f"{obsid}_ds" plot_file = f"{outdir}/dynamic_spectra/{ds_file_name}.{extension}" if not overwrite and os.path.exists(plot_file): logger.info("Dynamic spectrum already exists.") return 0, plot_file, len(mslist), 0 try: ########################################### tasks = [] for msname in mslist: tasks.append( delayed(calc_dynamic_spectrum_wrapper)( msname, metafits, f"{outdir}/dynamic_spectra", n_threads=n_threads, ) ) result_wrapper = [] logger.info("Start making dynamic spectra.") for i in range(0, len(tasks), njobs): batch = tasks[i : i + njobs] futures = dask_client.compute(batch) result_wrapper.extend(dask_client.gather(futures)) result_wrapper = list(result_wrapper) results = [] for r in result_wrapper: results.append(r[1]) logger.debug("================") logger.debug(f"Worker log for: {os.path.basename(r[0])}") logger.debug("================") for line in r[2].splitlines(): logger.debug(line) for line in r[3].splitlines(): logger.debug(line) ds_files = [] for r in results: ds_files.append(r[0]) succeed = len(ds_files) failed = len(mslist) - succeed logger.info(f"Total measurement sets: {len(mslist)}") logger.info(f"Total success: {succeed}") logger.info(f"Total failure: {failed}") logger.info(f"DS files: {[os.path.basename(i) for i in ds_files]}") ########################################### # Plotting dynamic spectrum ########################################### logger.info("Making dynamic spectra plots.") plot_file = make_ds_plot( ds_files, plot_file=plot_file, plot_quantity=plot_quantity, showgui=showgui, ) goes_files = glob.glob(f"{outdir}/dynamic_spectra/sci*.nc") for f in goes_files: os.system(f"rm -rf {f}") return 0, plot_file, succeed, failed except Exception: logger.exception("Exception occured in making dynamic spectra.", exc_info=True) return 1, "", succeed, failed
[docs] def main( mslist, metafits, workdir, outdir, plot_quantity="TB", extension="png", overwrite=False, cpu_frac=0.8, mem_frac=0.8, logfile=None, jobid="0", verbose=False, start_remote_log=False, dask_client=None, ): """ Make dynamic spectra Parameters ---------- mslist : str Measurement set list (comma seperated) metafits : str Metafits file workdir : str Work directory outdir : str Output directory plot_quantity : str Plotting quantity (TB or flux) extension : str, optional Plot extension overwrite : bool, optional Overwrite existing plot cpu_frac : float, optional CPU fraction mem_frac : float, optional Memory fraction logfile : str, optional Log file jobid : str, optional Job ID verbose : bool, optional Verbose logs start_remote_log : bool, optional Start remote log dask_client: dask.client, optional Dask client Returns ------- int Success messsage int Succeeded ms number int Failed ms number """ logger = get_logger_safe() if verbose: logger.setLevel(logging.DEBUG) cpu_frac = min(0.8, abs(cpu_frac)) mem_frac = min(0.8, abs(mem_frac)) mslist = mslist.split(",") if workdir == "": workdir = os.path.dirname(os.path.abspath(mslist[0])) + "/workdir" os.makedirs(workdir, exist_ok=True) os.chdir(workdir) logger.debug(f"Current working directory: {os.getcwd()}") if outdir == "": outdir = workdir os.makedirs(outdir, exist_ok=True) logger.debug(f"Output directory: {outdir}.") ############ # Logger ############ observer = None if ( start_remote_log and os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None ): time.sleep(5) jobname, password = np.load( f"{workdir}/.jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): observer = init_logger( "ds_plot", logfile, jobname=jobname, password=password ) if len(mslist) == 0: logger.critical("Please provide a valid measurement set list.") return 1, 0, 0 else: succeed = 0 failed = len(mslist) total_ncoarse = 0 for msname in mslist: ncoarse = get_ncoarse(msname) total_ncoarse += ncoarse total_ncoarse = max(1, total_ncoarse) logger.debug(f"Total coarse channels: {total_ncoarse}.") dask_cluster = None if dask_client is None: dask_client, dask_cluster, dask_dir, nworker = get_local_dask_cluster( workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, max_worker=len(mslist) + 1, ) if dask_client is None: logger.critical("Error occured in creating local cluster.") return 1, succeed, failed scale_worker_and_wait(dask_cluster, dask_client, nworker) try: for banner in print_banner( "Starting making dynamic spectra.", no_print=True ).splitlines(): logger.info(banner) client_info = dask_client.scheduler_info()["workers"] njobs = len(client_info) worker_mem_list = [] for addr, w in client_info.items(): worker_mem_list.append(w["memory_limit"] / 1024**3) mem_limit = round(min(worker_mem_list), 3) n_threads = os.environ.get("OMP_NUM_THREADS") if n_threads is not None: n_threads = int(n_threads) else: n_threads = 1 logger.info("#################################") logger.info(f"Total dask worker: {njobs}") logger.info(f"CPU per worker: {n_threads}") logger.info(f"Memory per worker: {mem_limit} GB") logger.info("#################################") msg, ds_plot_file, succeed, failed = make_solar_DS( mslist, dask_client, metafits, workdir, outdir, overwrite=overwrite, plot_quantity=plot_quantity, extension=extension, n_threads=n_threads, mem_limit=mem_limit, njobs=njobs, logger=logger, ) except Exception: logger.exception("Exception occured in making dynamic spectra.", exc_info=True) msg = 1 finally: time.sleep(5) clean_shutdown(observer) for msname in mslist: drop_cache(msname) if dask_cluster is not None: dask_client.shutdown() dask_client.close() dask_cluster.close() drop_cache(workdir) os.system(f"rm -rf {dask_dir}") return msg, succeed, failed
[docs] def cli(): parser = argparse.ArgumentParser( description="Make MWA dynamic spectra of the Sun", formatter_class=SmartDefaultsHelpFormatter, ) # === Essential parameters === essential = parser.add_argument_group( "###################\nEssential parameters\n###################" ) essential.add_argument( "mslist", type=str, help="Measurement set list (comma seperated)" ) essential.add_argument("metafits", type=str, help="Metafits file") essential.add_argument( "--workdir", type=str, dest="workdir", required=True, help="Working directory", ) essential.add_argument( "--outdir", type=str, dest="outdir", required=True, help="Output directory", ) # === Advanced parameters === adv_args = parser.add_argument_group( "###################\nAdvanced parameters\n###################" ) adv_args.add_argument( "--plot_quantity", type=str, default="TB", help="Plot quantity (TB ot flux)", ) adv_args.add_argument( "--extension", type=str, default="png", help="Save file extension", ) adv_args.add_argument( "--overwrite", action="store_true", help="Overwrite existing plot or not", ) adv_args.add_argument("--verbose", action="store_true", help="Verbose logs") adv_args.add_argument( "--jobid", type=str, default="0", help="Job ID for logging and PID tracking" ) # === Advanced local system/ per node hardware resource parameters === hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="Fraction of CPU usuage per node", ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Fraction of memory usuage per node", ) if len(sys.argv) == 1: parser.print_help(sys.stderr) return 1 args = parser.parse_args() msg, _, _ = main( args.mslist, args.metafits, args.workdir, args.outdir, plot_quantity=args.plot_quantity, extension=args.extension, overwrite=args.overwrite, cpu_frac=args.cpu_frac, mem_frac=args.mem_frac, jobid=args.jobid, verbose=args.verbose, ) return msg