Source code for paircars.pipeline.do_imaging

import logging
import numpy as np
import argparse
import copy
import time
import glob
import sys
import os
from casatools import msmetadata
from dask import delayed
from paircars.utils.basic_utils import (
    timestamp_to_mjdsec,
    mjdsec_to_timestamp,
    print_banner,
)
from paircars.utils.image_utils import (
    create_circular_mask,
    make_stokes_wsclean_imagecube,
)
from paircars.utils.imaging import (
    calc_sun_dia,
    calc_field_of_view,
    calc_npix_in_psf,
    calc_cellsize,
    calc_multiscale_scales,
    get_multiscale_bias,
    get_fft_size,
    calc_uvtaper,
    calc_maxuv,
)
from paircars.utils.logger_utils import (
    SmartDefaultsHelpFormatter,
    clean_shutdown,
    create_logger,
    init_logger,
    get_logger_safe,
)
from paircars.utils.ms_metadata import check_datacolumn_valid
from paircars.utils.mwa_utils import get_ncoarse, get_MWA_OBSID, get_MWA_coarse_chan
from paircars.utils.mwa_ploting_utils import rename_mwasolar_image
from paircars.utils.proc_manage_utils import (
    scale_worker_and_wait,
    get_local_dask_cluster,
)
from paircars.utils.resource_utils import drop_cache
from paircars.utils.udocker_utils import (
    check_udocker_container,
    initialize_wsclean_container,
    run_wsclean,
)

logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("tornado.application").setLevel(logging.CRITICAL)


[docs] def perform_imaging( msname="", workdir="", datacolumn="CORRECTED_DATA", freqrange="", timerange="", imagedir="", imsize=1024, cellsize=2, image_freqres=-1, image_timeres=-1, pol="I", weight="briggs", robust=0.0, minuv=0, threshold=1.0, use_multiscale=True, use_solar_mask=True, mask_radius=40, savemodel=True, saveres=True, cutout_rsun=10.0, make_plots=True, logfile="imaging.log", ncpu=1, mem=1, ): """ Perform spectropolarimetric snapshot imaging of a ms Parameters ---------- msname : str Name of the measurement set workdir : str Work directory name datacolumn : str, optional Data column freqrange : str, optional Frequency range to image timerange : str, optional Time range to image imagedir : str, optional Image directory name (default: workdir). Images, models, residuals will be saved in directories named images. models, residuals inside imagedir imsize : int, optional Image size in pixels cellsize : float, optional Cell size in arcseconds image_freqres : float, optional Frequency resolution of image in MHz image_timeres : float, optional Time resolution of image in seconds pol : str, optional Stokes parameters to image weight : str, optional Image weighting scheme robust : float, optional Briggs weighting robustness parameter minuv : float, optional Minimum UV-lambda to be used in imaging threshold : float, optional CLEAN threshold use_multiscale : bool, optional Use multiscale or not use_solar_mask : bool, optional Use solar mask mask_radius : float, optional Mask radius in arcminute savemodel : bool, optional Save model images or not saveres : bool, optional Save residual images or not cutout_rsun : float, optional Cutout image size in solar radii from center (default: 10.0 solar radii) make_plots : bool, optional Make radio map helioprojective plots logfile : str, optional Log file name ncpu : int, optional Number of CPU threads to use mem : float, optional Memory in GB to use Returns ------- int Success message list List of images [[images],[models],[residuals]] """ ncpu = max(1, ncpu) mem = abs(mem) if os.path.exists(logfile): os.system(f"rm -rf {logfile}") img_logger, logfile = create_logger( os.path.basename(logfile).split(".log")[0], logfile, ) sub_observer = None if os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None: time.sleep(5) jobname, password = np.load( f"{workdir}/.jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): sub_observer = init_logger( "remotelogger_imaging_{os.path.basename(msname).split('.ms')[0]}", logfile, jobname=jobname, password=password, ) try: msname = msname.rstrip("/") msname = os.path.abspath(msname) img_logger.info(f"Perform imaging for {os.path.basename(msname)}") ######### # Imaging ######### msmd = msmetadata() msmd.open(msname) msmd.meanfreq(0, unit="MHz") freqs = msmd.chanfreqs(0, unit="MHz") freqres = msmd.chanres(0, unit="MHz")[0] times = msmd.timesforspws(0) timeres = np.nanmin(np.diff(times)) npol = msmd.ncorrforpol()[0] msmd.close() ################################################### # Whether calibration solutions were applied or not ################################################### if os.path.exists(f"{msname}/.nocal"): cal_sol = False img_logger.warning("No calibrator solutions applied.") else: cal_sol = True img_logger.debug("Calibration solutions applied") #################################### # Whether pol-selfcal is done or not #################################### if os.path.exists(f"{msname}/.nopolselfcal"): pol_selfcal = False img_logger.warning("Polarisation self-calibration is not done.") else: img_logger.warning("Polarisation self-calibration is done.") pol_selfcal = True ################################### # Finding channel and time ranges ################################### if freqrange != "": start_chans = [] end_chans = [] freq_list = freqrange.split(",") for f in freq_list: start_freq = float(f.split("~")[0]) end_freq = float(f.split("~")[-1]) if start_freq >= np.nanmin(freqs) and end_freq <= np.nanmax(freqs): start_chan = np.argmin(np.abs(start_freq - freqs)) end_chan = np.argmin(np.abs(end_freq - freqs)) start_chans.append(start_chan) end_chans.append(end_chan) else: start_chans = [0] end_chans = [len(freqs)] if len(start_chans) == 0: img_logger.critical( f"Please provide valid channel range between 0 and {len(freqs)}" ) time.sleep(5) clean_shutdown(sub_observer) return 1, [] if timerange != "": start_times = [] end_times = [] time_list = timerange.split(",") for timerange in time_list: start_time = timestamp_to_mjdsec(timerange.split("~")[0]) end_time = timestamp_to_mjdsec(timerange.split("~")[-1]) if start_time >= np.nanmin(times) and end_time <= np.nanmax(times): start_times.append(np.argmin(np.abs(times - start_time))) end_times.append(np.argmin(np.abs(times - end_time))) else: start_times = [0] end_times = [len(times)] if len(start_times) == 0: img_logger.critical( f"Please provide valid time range between {mjdsec_to_timestamp(times[0])} and {mjdsec_to_timestamp(times[-1])}" ) time.sleep(5) clean_shutdown(sub_observer) return 1, [] if npol < 4 and pol == "IQUV": pol = "I" prefix = workdir + "/imaging_" + os.path.basename(msname).split(".ms")[0] if imagedir == "": imagedir = workdir os.makedirs(imagedir, exist_ok=True) if weight == "briggs": weight += " " + str(robust) if threshold <= 1: threshold = 1.1 uvtaper = calc_uvtaper(msname) _, maxuv = calc_maxuv(msname) maxuv = round(maxuv, 1) taper = round(max(0, maxuv - uvtaper), 1) wsclean_args = [ "-quiet", f"-scale {cellsize}asec", f"-size {imsize} {imsize}", "-no-dirty", "-gridder wgridder", f"-weight {weight}", f"-name {prefix}", f"-pol {pol}", "-niter 10000", "-mgain 0.85", "-nmiter 5", "-gain 0.1", f"-minuv-l {minuv}", f"-maxuv-l {maxuv}", f"-j {ncpu}", f"-abs-mem {round(mem, 2)}", f"-auto-threshold 1 -auto-mask {threshold}", "-no-update-model-required", ] if taper > 0: wsclean_args.append(f"-taper-tukey {taper}") if datacolumn != "CORRECTED_DATA" and datacolumn != "corrected": wsclean_args.append(f"-data-column {datacolumn}") ngrid = max(1, int(ncpu / 2)) if ngrid > 1: wsclean_args.append(f"-parallel-gridding {ngrid}") if pol == "I": wsclean_args.append("-no-negative") ################################################ # Creating and using a solar mask ################################################ if use_solar_mask: fits_mask = prefix + "_solar-mask.fits" if not os.path.exists(fits_mask): img_logger.debug( f"Creating solar mask of radius: {mask_radius} arcmin.\n", ) fits_mask = create_circular_mask( msname, cellsize, imsize, mask_radius=mask_radius ) if fits_mask is not None and os.path.exists(fits_mask): wsclean_args.append("-fits-mask " + fits_mask) final_list_dic = {"image": [], "model": [], "residual": []} for i in range(len(start_chans)): for j in range(len(start_times)): temp_wsclean_args = copy.deepcopy(wsclean_args) temp_wsclean_args.append( f"-channel-range {start_chans[i]} {end_chans[i]}" ) temp_wsclean_args.append(f"-interval {start_times[j]} {end_times[j]}") ##################################### # Spectral imaging configuration ##################################### if image_freqres > 0: nchan = max(1, int(image_freqres / freqres)) chan_chunk = int((end_chans[i] - start_chans[i]) / nchan) if chan_chunk > 1: temp_wsclean_args.append(f"-channels-out {chan_chunk}") temp_wsclean_args.append("-no-mf-weighting") ##################################### # Temporal imaging configuration ##################################### if image_timeres > 0: ntime = max(1, int(image_timeres / timeres)) time_chunk = int((end_times[j] - start_times[j]) / ntime) if time_chunk > 1: temp_wsclean_args.append(f"-intervals-out {time_chunk}") ###################################### # Multiscale configuration ###################################### if use_multiscale: num_pixel_in_psf = calc_npix_in_psf(weight, robust=robust) chan_number = int((start_chans[i] + end_chans[i]) / 2) freqMHz = freqs[chan_number] sun_dia = calc_sun_dia(freqMHz) # Sun diameter in arcmin sun_rad = sun_dia / 2 multiscale_scales = calc_multiscale_scales( msname, num_pixel_in_psf, chan_number=chan_number, max_scale=sun_rad, ) temp_wsclean_args.append("-multiscale") temp_wsclean_args.append("-multiscale-gain 0.1") temp_wsclean_args.append( "-multiscale-scales " + ",".join([str(s) for s in multiscale_scales]) ) mid_freq = np.nanmean( freqs[int(start_chans[i]) : int(end_chans[i])] ) scale_bias = get_multiscale_bias(mid_freq) temp_wsclean_args.append(f"-multiscale-scale-bias {scale_bias}") if imsize >= 1024 and 4 * max(multiscale_scales) < 512: temp_wsclean_args.append("-parallel-deconvolution 512") elif imsize >= 1024: temp_wsclean_args.append("-parallel-deconvolution 512") ###################################### # Running imaging ###################################### wsclean_cmd = "wsclean " + " ".join(temp_wsclean_args) + " " + msname img_logger.info( f"{wsclean_cmd}", ) msg = run_wsclean(wsclean_cmd, "paircarswsclean", verbose=False) if msg == 0: os.system("rm -rf " + prefix + "*psf.fits") ###################### # Making stokes cubes ###################### pollist = [i.upper() for i in list(pol)] if len(pollist) == 1: imagelist = sorted(glob.glob(prefix + "*image.fits")) if not savemodel: os.system("rm -rf " + prefix + "*model.fits") else: modellist = sorted(glob.glob(prefix + "*model.fits")) if not saveres: os.system("rm -rf " + prefix + "*residual.fits") else: reslist = sorted(glob.glob(prefix + "*residual.fits")) else: imagelist = [] stokeslist = [] for p in pollist: stokeslist.append( sorted(glob.glob(prefix + "*" + p + "-image.fits")) ) for i in range(len(stokeslist[0])): wsclean_images = sorted( [stokeslist[k][i] for k in range(len(pollist))] ) image_prefix = os.path.basename(wsclean_images[0]).split( "-image" )[0] image_cube = make_stokes_wsclean_imagecube( wsclean_images, image_prefix + f"_{pol}_image.fits", keep_wsclean_images=False, ) imagelist.append(image_cube) del stokeslist if not savemodel: os.system("rm -rf " + prefix + "*model.fits") else: modellist = [] stokeslist = [] for p in pollist: stokeslist.append( sorted(glob.glob(prefix + f"*{p}*model.fits")) ) for i in range(len(stokeslist[0])): wsclean_models = sorted( [stokeslist[k][i] for k in range(len(pollist))] ) model_prefix = os.path.basename( wsclean_models[0] ).split("-model")[0] model_cube = make_stokes_wsclean_imagecube( wsclean_models, model_prefix + f"_{pol}_model.fits", keep_wsclean_images=False, ) modellist.append(model_cube) del stokeslist if not saveres: os.system("rm -rf " + prefix + "*residual.fits") else: reslist = [] stokeslist = [] for p in pollist: stokeslist.append( sorted(glob.glob(prefix + f"*{p}*residual.fits")) ) for i in range(len(stokeslist[0])): wsclean_residuals = sorted( [stokeslist[k][i] for k in range(len(pollist))] ) res_prefix = os.path.basename( wsclean_residuals[0] ).split("-residual")[0] residual_cube = make_stokes_wsclean_imagecube( wsclean_residuals, res_prefix + f"_{pol}_residual.fits", keep_wsclean_images=False, ) reslist.append(residual_cube) del stokeslist ###################### # Renaming images ###################### if len(imagelist) > 0: img_logger.info(f"Total {len(imagelist)} images are made.") img_logger.info("Renaming and making plots.") os.makedirs(imagedir + "/images", exist_ok=True) final_image_list = [] for imagename in imagelist: renamed_image = rename_mwasolar_image( imagename, imagetype="image", imagedir=imagedir + "/images", pol=pol, cutout_rsun=cutout_rsun, make_plots=make_plots, pol_selfcal=pol_selfcal, cal_sol=cal_sol, ) if renamed_image is not None: final_image_list.append(renamed_image) final_list_dic["image"] = final_image_list if savemodel and len(modellist) > 0: final_model_list = [] os.makedirs(imagedir + "/models", exist_ok=True) for modelname in modellist: renamed_model = rename_mwasolar_image( modelname, imagetype="model", imagedir=imagedir + "/models", pol=pol, cutout_rsun=cutout_rsun, make_plots=False, pol_selfcal=pol_selfcal, cal_sol=cal_sol, ) if renamed_model is not None: final_model_list.append(renamed_model) final_list_dic["model"] = final_model_list if saveres and len(reslist) > 0: final_res_list = [] os.makedirs(imagedir + "/residuals", exist_ok=True) for resname in reslist: renamed_res = rename_mwasolar_image( resname, imagetype="residual", imagedir=imagedir + "/residuals", pol=pol, cutout_rsun=cutout_rsun, make_plots=False, pol_selfcal=pol_selfcal, ) if renamed_res is not None: final_res_list.append(renamed_res) final_list_dic["residual"] = final_res_list if os.path.exists(f"{imagedir}/images/dask-scratch-space"): os.system(f"rm -rf {imagedir}/images/dask-scratch-space") if use_solar_mask and os.path.exists(fits_mask): os.system("rm -rf " + fits_mask) if len(final_list_dic["image"]) == 0: img_logger.error( "No image is made.", ) time.sleep(5) clean_shutdown(sub_observer) return 1, final_list_dic else: img_logger.info( "Imaging is successfully done.", ) time.sleep(5) clean_shutdown(sub_observer) return 0, final_list_dic else: if use_solar_mask and os.path.exists(fits_mask): os.system("rm -rf " + fits_mask) img_logger.critical( "No image is made.", ) time.sleep(5) clean_shutdown(sub_observer) return 1, {} except Exception: img_logger.exception( "Exception occured in imaging: {os.path.basename(msname)}", exc_info=True ) time.sleep(5) clean_shutdown(sub_observer) return 1, {} finally: time.sleep(5) drop_cache(msname)
[docs] def run_all_imaging( mslist, dask_client, workdir="", outdir="", freqrange="", timerange="", datacolumn="CORRECTED_DATA", freqres=-1, timeres=-1, weight="briggs", robust=0.0, minuv=0, pol="I", threshold=1.0, use_multiscale=True, use_solar_mask=True, imaging_params={}, # TODO savemodel=False, saveres=False, cutout_rsun=10.0, make_plots=True, n_threads=1, mem_limit=1, logger=None, ): """ Run spectropolarimetric snapshot imaging on a list of measurement sets Parameters ---------- mslist : list Measurement set list dask_client : dask.client Dask client workdir : str Work directory outdir : str Output directory freqrange : str, optional Frequency range to image timerange : str, optional Time range datacolumn : str, optional Data column freqres : float, optional Frequency resolution of spectral chunk in MHz timeres : float, optional Time resolution of temporal chunk in seconds weight : str, optional Image weighting robust : float, optional Briggs weighting robust parameter minuv : float, optional Minimum UV-lambda to use in imaging pol : str, optional Stokes parameters to image threshold : float, optional CLEAN threshold use_multiscale : bool, optional Use multiscale or not use_solar_mask : bool, optional Use solar mask savemodel : bool, optional Save model images or not saveres : bool, optional Save residual images or not cutout_rsun : float, optional Cutout image size (width and height is : 2 times cutout_rsun) Default value: 10 solar radii Note: default FoV is 20 solar solar radii. If cutout_rsun is chosen larger than 20 solar radii, FoV will be increased accordingly. make_plots : bool, optional Make radio image helioprojective plots n_threads : int, optional CPU threads to use mem_limit : float, optional Memory to use in GB Returns ------- int Success message int Succeeded ms number int Failed ms number int Total images """ if logger is None: logger = get_logger_safe() mslist = sorted(mslist) if len(mslist) == 0: logger.critical("Please provide a valid measurement set list.") return 1, 0, 0 else: succeed = 0 failed = len(mslist) total_images = 0 try: if weight == "briggs": weight_str = f"{weight}_{robust}" else: weight_str = weight if freqres == -1 and timeres == -1: imagedir = outdir + f"/imagedir_f_all_t_all_pol_{pol}_w_{weight_str}" elif freqres != -1 and timeres == -1: imagedir = outdir + f"/imagedir_f_{freqres}_t_all_pol_{pol}_w_{weight_str}" elif freqres == -1 and timeres != -1: imagedir = outdir + f"/imagedir_f_all_t_{timeres}_pol_{pol}_w_{weight_str}" else: imagedir = ( outdir + f"/imagedir_f_{freqres}_t_{timeres}_pol_{pol}_w_{weight_str}" ) os.makedirs(imagedir, exist_ok=True) logger.debug(f"Image directory: {imagedir}") #################################### # Filtering any corrupted ms ##################################### filtered_mslist = [] # Filtering in case any ms is corrupted for ms in mslist: checkcol = check_datacolumn_valid(ms) if checkcol: filtered_mslist.append(ms) else: logger.warning(f"Issue in : {ms}") mslist = filtered_mslist if len(mslist) == 0: logger.critical("No valid measurement set is found.") return 1, succeed, failed, total_images cutout_rsun = max( 5, cutout_rsun ) # Minimum 5 solar radii cutout, default is 10 solar radii tasks = [] for i in range(len(mslist)): ms = mslist[i] num_pixel_in_psf = calc_npix_in_psf(weight, robust=robust) cellsize = calc_cellsize(ms, num_pixel_in_psf) instrument_fov = calc_field_of_view(ms, FWHM=False) cutout_rsun_arcsec = cutout_rsun * 16 * 60 fov = min(instrument_fov, 2 * cutout_rsun_arcsec) imsize = int(fov / cellsize) imsize = get_fft_size(imsize) os.makedirs(workdir + "/logs", exist_ok=True) obsid = get_MWA_OBSID(ms) coarse_chan = get_MWA_coarse_chan(ms) if len(coarse_chan) > 1: coarse_chan = f"{min(coarse_chan)}-{max(coarse_chan)}" else: coarse_chan = f"{min(coarse_chan)}" logfile = f"{workdir}/logs/imaging_{obsid}_ch_{coarse_chan}.log" tasks.append( delayed(perform_imaging)( msname=ms, workdir=workdir, freqrange=freqrange, timerange=timerange, datacolumn=datacolumn, imagedir=imagedir, imsize=imsize, cellsize=cellsize, image_freqres=freqres, image_timeres=timeres, pol=pol, weight=weight, robust=robust, minuv=minuv, threshold=threshold, use_multiscale=use_multiscale, use_solar_mask=use_solar_mask, savemodel=savemodel, saveres=saveres, cutout_rsun=cutout_rsun, make_plots=make_plots, ncpu=n_threads, mem=mem_limit, logfile=logfile, ) ) results = list(dask_client.gather(dask_client.compute(tasks))) all_image_list = [] all_imaged_ms_list = [] for i in range(len(results)): r = results[i][1] if len(r) == 0: logger.error( f"Imaging failed for ms : {mslist[i]}", ) else: image_list = r["image"] if len(image_list) == 0: logger.error( f"No image is made for ms : {mslist[i]}", ) else: all_imaged_ms_list.append(mslist[i]) for image in image_list: all_image_list.append(image) succeed = len(all_imaged_ms_list) failed = len(mslist) - succeed total_images = len(all_image_list) logger.info( f"Numbers of input measurement sets : {len(mslist)}.", ) logger.info( f"Imaging successfully done for: {succeed} measurement sets.", ) logger.info( f"Imaging failed for: {failed} measurement sets.", ) logger.info(f"Total images made: {total_images}.") return 0, succeed, failed, total_images except Exception: logger.exception("Exception occured in imaging.", exc_info=True) return 1, succeed, failed, total_images
[docs] def main( mslist, workdir, outdir, freqrange="", timerange="", datacolumn="CORRECTED_DATA", pol="I", freqres=-1, timeres=-1, weight="briggs", robust=0.0, minuv=0, threshold=1.0, cutout_rsun=10.0, use_multiscale=True, use_solar_mask=True, savemodel=True, saveres=True, make_plots=True, start_remote_log=False, cpu_frac=0.8, mem_frac=0.8, logfile=None, jobid=0, verbose=False, dask_client=None, ): """ Perform distributed spectropolarimetric snapshot imaging on multiple measurement sets. Parameters ---------- mslist : str Comma-separated list of measurement set paths to be imaged. workdir : str Directory for intermediate files, logs, and temporary outputs. outdir : str Directory where final images, models, and plots will be saved. freqrange : str, optional Frequency range to image (e.g., "500~1000MHz"). Default is "" (all frequencies). timerange : str, optional Time range to image (e.g., "09:30:00~09:40:00"). Default is "" (all times). datacolumn : str, optional Data column to image (e.g., "DATA", "CORRECTED_DATA"). Default is "CORRECTED_DATA". pol : str, optional Polarization product to image (e.g., "I", "XX", "RR", "QUV"). Default is "I". freqres : float, optional Frequency resolution in MHz for slicing the MS. Use -1 to disable. Default is -1. timeres : float, optional Time resolution in seconds for snapshot imaging. Use -1 to disable. Default is -1. weight : str, optional Weighting scheme for imaging ("natural", "uniform", "briggs"). Default is "briggs". robust : float, optional Robustness parameter for Briggs weighting. Default is 0.0. minuv : float, optional Minimum uv-distance (in wavelengths) to include in imaging. Default is 0.0. threshold : float, optional Cleaning threshold in sigma. Default is 1.0. cutout_rsun : float, optional Radius in solar radii to cut out around solar center. Default is 10.0. use_multiscale : bool, optional If True, enables multiscale CLEAN deconvolution. Default is True. use_solar_mask : bool, optional If True, applies a solar disk mask during CLEAN to reduce sidelobe artifacts. Default is True. savemodel : bool, optional If True, saves the CLEAN model images. Default is True. saveres : bool, optional If True, saves the residual images. Default is True. make_plots : bool, optional If True, generates diagnostic plots for each image. Default is True. start_remote_log : bool, optional Whether to enable remote logging using credentials in the workdir. Default is False. cpu_frac : float, optional Fraction of total CPUs to use per task. Default is 0.8. mem_frac : float, optional Fraction of total system memory to use per task. Default is 0.8. logfile : str, optional Log file jobid : int, optional Unique job identifier for logging and PID tracking. Default is 0. verbose : bool, optional Verbose logs dask_client : dask.client, optional Dask client Returns ------- int Success message int Succeeded ms number int Failed ms number int Total images """ logger = get_logger_safe() if verbose: logger.setLevel(logging.DEBUG) cpu_frac = min(0.8, abs(cpu_frac)) mem_frac = min(0.8, abs(mem_frac)) mslist = mslist.split(",") if workdir == "": workdir = os.path.dirname(os.path.abspath(mslist[0])) + "/workdir" os.makedirs(workdir, exist_ok=True) os.chdir(workdir) logger.debug(f"Current working directory: {os.getcwd()}") if outdir == "" or not os.path.exists(outdir): outdir = workdir outdir = outdir.rstrip("/") os.makedirs(outdir, exist_ok=True) logger.debug(f"Output image directory: {outdir}") if len(mslist) == 0: logger.critical("Please provide a valid measurement set list.") return 1, 0, 0, 0 else: succeed = 0 failed = len(mslist) total_images = 0 ########################### # WSClean container ########################### container_name = "paircarswsclean" container_present = check_udocker_container(container_name) if not container_present: logger.debug(f"Initializing {container_name}.") container_name = initialize_wsclean_container(name=container_name, verbose=True) if container_name is None: logger.critical( f"Container {container_name} is not initiated. First initiate container and then run." ) return 1, succeed, failed, total_images ############ # Logger ############ observer = None if ( start_remote_log and os.path.exists(f"{workdir}/.jobname_password.npy") and logfile is not None ): time.sleep(5) jobname, password = np.load( f"{workdir}/.jobname_password.npy", allow_pickle=True ) if os.path.exists(logfile): observer = init_logger( "all_imaging", logfile, jobname=jobname, password=password ) if observer is None: logger.info( "Remote link or jobname is blank. Not transmiting to remote logger." ) total_ncoarse = 0 for msname in mslist: ncoarse = get_ncoarse(msname) total_ncoarse += ncoarse total_ncoarse = max(1, total_ncoarse) logger.debug(f"Total coarse channels: {total_ncoarse}.") dask_cluster = None if dask_client is None: dask_client, dask_cluster, dask_dir, nworker = get_local_dask_cluster( workdir, cpu_frac=cpu_frac, mem_frac=mem_frac, max_worker=len(mslist) + 1, ) if dask_client is None: logger.critical("Error occured in creating local cluster.") return 1, succeed, failed, total_images scale_worker_and_wait(dask_cluster, dask_client, nworker) try: for banner in print_banner("Starting imaging.", no_print=True).splitlines(): logger.info(banner) client_info = dask_client.scheduler_info()["workers"] njobs = len(client_info) worker_mem_list = [] for addr, w in client_info.items(): worker_mem_list.append(w["memory_limit"] / 1024**3) if len(worker_mem_list) > 0: mem_limit = round(min(worker_mem_list), 3) else: mem_limit = 1 n_threads = os.environ.get("OMP_NUM_THREADS") if n_threads is not None: n_threads = int(n_threads) else: n_threads = 1 logger.info("#################################") logger.info(f"Total dask worker: {njobs}") logger.info(f"CPU per worker: {n_threads}") logger.info(f"Memory per worker: {mem_limit} GB") logger.info("#################################") msg, succeed, failed, total_images = run_all_imaging( mslist, dask_client, workdir=workdir, outdir=outdir, freqrange=freqrange, timerange=timerange, datacolumn=datacolumn, freqres=freqres, timeres=timeres, weight=weight, robust=robust, minuv=minuv, threshold=threshold, use_multiscale=use_multiscale, use_solar_mask=use_solar_mask, pol=pol, make_plots=make_plots, cutout_rsun=cutout_rsun, savemodel=savemodel, saveres=saveres, n_threads=n_threads, mem_limit=mem_limit, logger=logger, ) except Exception: logger.exception("Exception occured in imaging.", exc_info=True) msg = 1 finally: time.sleep(5) clean_shutdown(observer) for ms in mslist: drop_cache(ms) if dask_cluster is not None: dask_client.shutdown() dask_client.close() dask_cluster.close() drop_cache(workdir) drop_cache(outdir) os.system(f"rm -rf {dask_dir}") return msg, succeed, failed, total_images
[docs] def cli(): parser = argparse.ArgumentParser( description="Perform spectropolarimetric snapshot imaging", formatter_class=SmartDefaultsHelpFormatter, ) # Essential parameters basic_args = parser.add_argument_group( "###################\nEssential parameters\n###################" ) basic_args.add_argument( "mslist", type=str, help="Comma-separated list of measurement sets (required)", ) basic_args.add_argument( "--workdir", type=str, required=True, default="", help="Work directory for imaging", ) basic_args.add_argument( "--outdir", type=str, default="", help="Output directory for imaging products", ) # Advanced parameters adv_args = parser.add_argument_group( "###################\nAdvanced imaging parameters\n###################" ) adv_args.add_argument( "--freqrange", type=str, default="", help="Frequency range to image", ) adv_args.add_argument( "--timerange", type=str, default="", help="Time range to image", ) adv_args.add_argument( "--datacolumn", type=str, default="CORRECTED_DATA", help="Data column to use for imaging", ) adv_args.add_argument( "--pol", type=str, default="I", help="Stokes parameters to image", ) adv_args.add_argument( "--freqres", type=float, default=-1, help="Frequency resolution per chunk in MHz (-1 for full)", ) adv_args.add_argument( "--timeres", type=float, default=-1, help="Time resolution per chunk in seconds (-1 for full)", ) adv_args.add_argument( "--weight", type=str, default="briggs", help="Imaging weighting scheme", ) adv_args.add_argument( "--robust", type=float, default=0.0, help="Briggs robust parameter", ) adv_args.add_argument( "--minuv_l", dest="minuv", type=float, default=0, help="Minimum UV distance in wavelengths", ) adv_args.add_argument( "--threshold", type=float, default=1.0, help="CLEAN threshold in sigma", ) adv_args.add_argument( "--cutout_rsun", type=float, default=10.0, help="Cutout radius for images (solar radii)", ) adv_args.add_argument( "--no_multiscale", action="store_false", dest="use_multiscale", help="Do not use multiscale CLEAN", ) adv_args.add_argument( "--no_solar_mask", action="store_false", dest="use_solar_mask", help="Do not use solar disk mask for CLEANing", ) adv_args.add_argument( "--no_savemodel", action="store_false", dest="savemodel", help="Do no save model images", ) adv_args.add_argument( "--no_saveres", action="store_false", dest="saveres", help="Do not save residual images", ) adv_args.add_argument( "--no_make_plots", action="store_false", dest="make_plots", help="Do not generate helioprojective plots", ) # Resource management parameters hard_args = parser.add_argument_group( "###################\nHardware resource management parameters\n###################" ) hard_args.add_argument( "--cpu_frac", type=float, default=0.8, help="Fraction of available CPU to use", ) hard_args.add_argument( "--mem_frac", type=float, default=0.8, help="Fraction of available memory to use", ) hard_args.add_argument( "--jobid", type=str, default="0", help="Job ID for process tracking and logging", ) if len(sys.argv) == 1: parser.print_help(sys.stderr) return 1 args = parser.parse_args() msg, _, _, _ = main( mslist=args.mslist, workdir=args.workdir, outdir=args.outdir, freqrange=args.freqrange, timerange=args.timerange, datacolumn=args.datacolumn, pol=args.pol, freqres=args.freqres, timeres=args.timeres, weight=args.weight, robust=args.robust, minuv=args.minuv, threshold=args.threshold, cutout_rsun=args.cutout_rsun, use_multiscale=args.use_multiscale, use_solar_mask=args.use_solar_mask, savemodel=args.savemodel, saveres=args.saveres, make_plots=args.make_plots, cpu_frac=float(args.cpu_frac), mem_frac=float(args.mem_frac), jobid=args.jobid, ) return msg