Source code for paircars.utils.basic_utils

import numpy as np
import os
import copy
import time
import socket
import stat
import tempfile
import sys
import astropy.units as u
from io import StringIO
from astropy.time import Time
from astropy.coordinates import Angle
from contextlib import contextmanager


##########################
# Basic utility funactions
##########################
[docs] @contextmanager def capture_all_output(): old_stdout = sys.stdout old_stderr = sys.stderr stdout_buffer = StringIO() stderr_buffer = StringIO() sys.stdout = stdout_buffer sys.stderr = stderr_buffer try: yield stdout_buffer, stderr_buffer finally: sys.stdout = old_stdout sys.stderr = old_stderr
[docs] def internet_available(timeout=10): """ Check internet connection """ try: socket.create_connection(("8.8.8.8", 53), timeout=timeout) return True except OSError: return False
[docs] def check_permission(path): """ check read, write, execute permission of a file or directory Parameters ---------- path : str File or directory path Returns ------- bool Whether hand permission or not """ print(f"Checking permission for: {path}") if not os.path.exists(path): return False ########################### # READ TEST ########################### try: if os.path.isfile(path): with open(path, "rb") as f: f.read(1) elif os.path.isdir(path): os.listdir(path) else: return False except Exception: return False ########################### # WRITE TEST ########################### try: if os.path.isfile(path): with open(path, "a"): pass elif os.path.isdir(path): with tempfile.NamedTemporaryFile(dir=path, delete=True): pass except Exception: return False ########################### # EXECUTE TEST ########################### try: if os.path.isdir(path): if not os.access(path, os.X_OK): return False elif os.path.isfile(path): mode = os.stat(path).st_mode if not (mode & stat.S_IXUSR): return False except Exception: return False return True
[docs] @contextmanager def suppress_output(): """ Supress CASA terminal output """ with open(os.devnull, "w") as fnull: old_stdout = os.dup(1) old_stderr = os.dup(2) os.dup2(fnull.fileno(), 1) os.dup2(fnull.fileno(), 2) try: yield finally: os.dup2(old_stdout, 1) os.dup2(old_stderr, 2)
[docs] def get_cachedir(): """ Get cache directory """ homedir = os.path.expanduser("~") cachedir = f"{homedir}/.paircarspipe" os.makedirs(cachedir, exist_ok=True) return cachedir
[docs] def create_datadir(datadir=""): """ Create data directory Parameters ---------- datadir : str, optional User provided custom data directory """ cachedir = get_cachedir() if datadir == "": datadir = f"{cachedir}/paircarspipe_data" else: datadir = f"{datadir}/paircarspipe_data" os.makedirs(datadir, exist_ok=True) with open(f"{cachedir}/paircarspipe_data_dir.txt", "w") as f: f.write(str(datadir) + "\n") return
[docs] def get_datadir(): """ Get package data directory Returns ------- str Data directory """ cachedir = get_cachedir() if not os.path.exists(f"{cachedir}/paircarspipe_data_dir.txt"): return None with open(f"{cachedir}/paircarspipe_data_dir.txt", "r") as f: datadir = f.read().strip() os.makedirs(datadir, exist_ok=True) return datadir
[docs] def wait_for_port(host, port, timeout=60): """ Waiting for port Parameters ---------- port : int Port number timeout : int, optional Timeout in seconds """ start = time.time() while time.time() - start < timeout: try: with socket.create_connection((host, port), timeout=2): return True except OSError: time.sleep(2) return False
[docs] def get_free_port(start_port=4200, end_port=4300): """ Get free port Parameters ---------- start_port : int, optional Start port range end_port : int, optional End port range Returns ------- int Free port """ for port in range(start_port, end_port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(("127.0.0.1", port)) return port except OSError: continue
[docs] def check_port_status(port): """ Check port status Parameters ---------- port : int Port number Returns ------- int Port free status """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(("127.0.0.1", port)) return True except OSError: return False
[docs] def split_into_chunks(lst, target_chunk_size): """ Split a list into equal number of elements Parameters ---------- lst : list List of numbers target_chunk_size: int Number of elements per chunk Returns ------- list Chunked list """ n = len(lst) num_chunks = max(1, round(n / target_chunk_size)) avg_chunk_size = n // num_chunks remainder = n % num_chunks chunks = [] start = 0 for i in range(num_chunks): extra = 1 if i < remainder else 0 # Distribute remainder end = start + avg_chunk_size + extra chunks.append(lst[start:end]) start = end return chunks
[docs] def interpolate_nans(data): """Linearly interpolate NaNs in 1D array.""" from scipy.interpolate import interp1d nans = np.isnan(data) if np.sum(~nans) < 5: return data x = np.arange(len(data)) interp_func = interp1d( x[~nans], data[~nans], kind="linear", bounds_error=False, fill_value="extrapolate", ) return interp_func(x)
[docs] def average_timestamp(timestamps): """ Compute the average timestamp using astropy from a list of ISO 8601 strings. Parameters ---------- timestamps : list timestamps (list of str): List of timestamp strings in 'YYYY-MM-DDTHH:MM:SS' format. Returns -------- str Average timestamp in 'YYYY-MM-DDTHH:MM:SS' format. """ if len(timestamps) == 0: return "" times = Time(timestamps, format="isot", scale="utc") avg_time = Time(np.mean(times.jd), format="jd", scale="utc") return avg_time.isot.split(".")[0] # Strip milliseconds for clean output
[docs] def ceil_to_multiple(n, base): """ Round up to the next multiple Parameters ---------- n : float The number base : float Whose multiple will be Returns ------- float The modified number """ return ((n // base) + 1) * base
[docs] def average_with_padding(array, chanwidth, axis=0, pad_value=np.nan): """ Averages an array along a specified axis with a given chunk width (chanwidth), padding the array if its size along that axis is not divisible by chanwidth. Parameters ---------- array : ndarray Input array to average. chanwidth : int Width of chunks to average. axis : int Axis along which to perform the averaging. pad_value : float Value to pad with if padding is needed (default: np.nan). Returns -------- ndarray Array averaged along the specified axis. """ # Compute the shape along the specified axis original_size = array.shape[axis] pad_size = -original_size % chanwidth # If padding is needed, apply it directly along the target axis if pad_size > 0: pad_width = [(0, 0)] * array.ndim pad_width[axis] = (0, pad_size) array = np.pad(array, pad_width, constant_values=pad_value) # Compute the new shape and reshape the array for chunking new_shape = list(array.shape) new_shape[axis] = array.shape[axis] // chanwidth new_shape.insert(axis + 1, chanwidth) reshaped_array = array.reshape(new_shape) # Use nanmean along the chunk axis for averaging averaged_array = np.nanmean(reshaped_array, axis=axis + 1) return averaged_array
[docs] def filter_outliers(data, threshold=5, max_iter=3): """ Filter outliers and perform cubic spline fitting Parameters ---------- y : numpy.array Y values threshold : float Threshold of filtering max_iter : int Maximum number of iterations Returns ------- numpy.array Clean Y-values """ from scipy.ndimage import gaussian_filter1d for c in range(max_iter): data = np.asarray(data, dtype=np.float64) original_nan_mask = np.isnan(data) # Interpolate NaNs for smoothing interpolated_data = interpolate_nans(data) # Apply Gaussian smoothing smoothed = gaussian_filter1d( interpolated_data, sigma=threshold, truncate=3 * threshold ) # Compute residuals and std only on valid original data residuals = data - smoothed valid_mask = ~original_nan_mask std_dev = np.std(residuals[valid_mask]) if std_dev == 0.0: return data # Detect outliers outlier_mask = np.abs(residuals) > threshold * std_dev combined_mask = valid_mask & ~outlier_mask # Replace outliers with NaN filtered_data = np.where(combined_mask, data, np.nan) data = copy.deepcopy(filtered_data) return filtered_data
[docs] def weighted_mean(x, xerr): """ Calculate weighted mean Parameters ---------- x : numpy.array Data array xerr : numpy.array Error array Returns ------- float Weighted mean float Weighted error """ w = 1.0 / xerr**2 mean = np.nansum(w * x) / np.nansum(w) err = np.sqrt(1.0 / np.nansum(w)) return mean, err
[docs] def angular_separation_equatorial(ra1, dec1, ra2, dec2): """ Calculate angular seperation between two equatorial coordinates Parameters ---------- ra1 : float RA of the first coordinate in degree dec1 : float DEC of the first coordinate in degree ra2 : float RA of the second coordinate in degree dec2 : float DEC of the second coordinate in degree Returns ------- float Angular distance in degree """ # Convert RA and Dec from degrees to radians ra1 = np.radians(ra1) ra2 = np.radians(ra2) dec1 = np.radians(dec1) dec2 = np.radians(dec2) # Apply the spherical distance formula using NumPy functions cos_theta = np.sin(dec1) * np.sin(dec2) + np.cos(dec1) * np.cos(dec2) * np.cos( ra1 - ra2 ) # Calculate the angular separation in radians theta_rad = np.arccos(cos_theta) # Convert the angular separation from radians to degrees theta_deg = np.degrees(theta_rad) return round(theta_deg, 2)
[docs] def ra_dec_to_deg(ra_hms, dec_dms): """ Convert RA and Dec from hms and dms format to degrees Parameters ---------- ra_hms: str Right Ascension in 'hms' format dec_dms : str Declination in 'dms' format Returns ------- tuple RA and Dec in degrees """ ra = Angle(ra_hms, unit=u.hourangle) dec = Angle(dec_dms, unit=u.deg) return round(ra.deg, 4), round(dec.deg, 4)
[docs] def ra_dec_to_hms_dms(ra_deg, dec_deg): """ Convert RA and Dec in degrees to hms and dms format Parameters ---------- ra_deg : float Right Ascension in degrees. dec_deg : float Declination in degrees. Returns ------- tuple RA in h:m:s format, Dec in d:m:s format (e.g., '1h5m0s', '1d5m0s'). """ # Convert RA to h:m:s if ra_deg < 0: ra_deg += 360 ra = Angle(ra_deg, unit=u.deg) ra_hms = ra.to_string(unit=u.hourangle, sep=":").split(":") ra_hms = ra_hms[0] + "h" + ra_hms[1] + "m" + ra_hms[2] + "s" # Convert Dec to d:m:s dec = Angle(dec_deg, unit=u.deg) dec_dms = dec.to_string(unit=u.deg, sep=":").split(":") dec_dms = dec_dms[0] + "d" + dec_dms[1] + "m" + dec_dms[2] + "s" return ra_hms, dec_dms
[docs] def timestamp_to_mjdsec(timestamp, date_format=0): """ Convert timestamp to mjd second. Parameters ---------- timestamp : str Time stamp to convert date_format : int, optional Datetime string format 0: 'YYYY/MM/DD/hh:mm:ss' 1: 'YYYY-MM-DDThh:mm:ss' 2: 'YYYY-MM-DD hh:mm:ss' 3: 'YYYY_MM_DD_hh_mm_ss' Returns ------- float Return correspondong MJD second of the day """ import julian from datetime import datetime as dt if date_format == 0: try: timestamp_datetime = dt.strptime(timestamp, "%Y/%m/%d/%H:%M:%S.%f") except BaseException: timestamp_datetime = dt.strptime(timestamp, "%Y/%m/%d/%H:%M:%S") elif date_format == 1: try: timestamp_datetime = dt.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f") except BaseException: timestamp_datetime = dt.strptime(timestamp, "%Y-%m-%dT%H:%M:%S") elif date_format == 2: try: timestamp_datetime = dt.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") except BaseException: timestamp_datetime = dt.strptime(timestamp, "%Y-%m-%d %H:%M:%S") elif date_format == 3: try: timestamp_datetime = dt.strptime(timestamp, "%Y_%m_%d_%H_%M_%S.%f") except BaseException: timestamp_datetime = dt.strptime(timestamp, "%Y_%m_%d_%H_%M_%S") else: print("No proper format of timestamp.\n") return mjd = float( "{: .2f}".format( (julian.to_jd(timestamp_datetime) - 2400000.5) * (24.0 * 3600.0) ) ) return mjd
[docs] def mjdsec_to_timestamp(mjdsec, str_format=0): """ Convert CASA MJD seceonds to CASA timestamp Parameters ---------- mjdsec : float CASA MJD seconds str_format : int Time stamp format (0: yyyy-mm-ddTHH:MM:SS.ff, 1: yyyy/mm/dd/HH:MM:SS.ff, 2: yyyy-mm-dd HH:MM:SS) Returns ------- str CASA time stamp in UTC at ISOT format """ from casatools import measures, quanta me = measures() qa = quanta() today = me.epoch("utc", "today") mjd = np.array(mjdsec) / 86400.0 today["m0"]["value"] = mjd hhmmss = qa.time(today["m0"], prec=8)[0] date = qa.splitdate(today["m0"]) qa.done() if str_format == 0: utcstring = "%s-%02d-%02dT%s" % ( date["year"], date["month"], date["monthday"], hhmmss, ) elif str_format == 1: utcstring = "%s/%02d/%02d/%s" % ( date["year"], date["month"], date["monthday"], hhmmss, ) else: utcstring = "%s-%02d-%02d %s" % ( date["year"], date["month"], date["monthday"], hhmmss, ) return utcstring