Source code for paircars.utils.resource_utils

import shutil
import traceback
import platform
import ctypes
import tempfile
import os
from contextlib import contextmanager

POSIX_FADV_DONTNEED = 4
libc = ctypes.CDLL("libc.so.6")


#####################################
# Resource management
#####################################
[docs] def drop_file_cache(filepath, verbose=False): """ Advise the OS to drop the given file from the page cache. Safe, per-file, no sudo required. """ if platform.system() != "Linux": raise NotImplementedError("drop_file_cache is only supported on Linux") try: if os.path.exists(filepath) is False: return if not os.path.isfile(filepath): return fd = os.open(filepath, os.O_RDONLY) result = libc.posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED) os.close(fd) if verbose: if result == 0: print(f"[cache drop] Released: {filepath}") else: print(f"[cache drop] Failed ({result}) for: {filepath}") except Exception as e: if verbose: print(f"[cache drop] Error for {filepath}: {e}") traceback.print_exc()
[docs] def drop_cache(path, verbose=False): """ Drop file cache for a file or all files under a directory. Parameters ---------- path : str File or directory path """ if platform.system() != "Linux": raise NotImplementedError("drop_file_cache is only supported on Linux") if os.path.isfile(path): drop_file_cache(path, verbose=verbose) elif os.path.isdir(path): for root, _, files in os.walk(path): for f in files: full_path = os.path.join(root, f) drop_file_cache(full_path, verbose=verbose) else: if verbose: print(f"[cache drop] Path does not exist or is not valid: {path}")
[docs] def has_space(path, required_gb): try: stat = shutil.disk_usage(path) free_space = round(stat.free / 1024**3, 1) print(f"Free space: {free_space}GB") return free_space >= required_gb except BaseException: return False
[docs] @contextmanager def shm_or_tmp(required_gb, workdir, prefix="solar_", verbose=False): """ Create a temporary working directory: 1. Try /dev/shm if it has required space 2. Else TMPDIR if set and has space 4. Else work directory Temporarily sets TMPDIR to the selected path during the context. Cleans up after use. Parameters ---------- required_gb : float Required disk space in GB workdir : str Fall back work directory prefix : str, optional Temp directory prefix verbose : bool, optional Verbose """ candidates = [] if has_space("/dev/shm", required_gb): candidates.append("/dev/shm") tmpdir_env = os.environ.get("TMPDIR") if tmpdir_env is not None and has_space(tmpdir_env, required_gb): candidates.append(tmpdir_env) for i in range(len(candidates)): base_dir = candidates[i] try: temp_dir = tempfile.mkdtemp(dir=base_dir, prefix=prefix) if verbose: if i == 0: print("Using RAM") elif i == 1: print("Using {os.environ.get('TMPDIR')}") break except Exception as e: print(f"[shm_or_tmp] Failed to create temp dir in {base_dir}: {e}") else: raise RuntimeError( "Could not create a temporary directory in any fallback location." ) # Override TMPDIR old_tmpdir = os.environ.get("TMPDIR") os.environ["TMPDIR"] = temp_dir try: yield temp_dir finally: # Restore TMPDIR if old_tmpdir is not None: os.environ["TMPDIR"] = old_tmpdir else: os.environ.pop("TMPDIR", None) # Clean up the temp directory try: shutil.rmtree(temp_dir) except Exception as e: print(f"[cleanup] Warning: could not delete {temp_dir}: {e}")
[docs] @contextmanager def tmp_with_cache_rel(required_gb, workdir, prefix="solar_", verbose=False): """ Combined context manager: - Uses shm_or_tmp() for workspace - Drops kernel page cache for all files in that directory on exit Parameters ---------- required_gb : float Required disk space in GB workdir : str Fall back work directory prefix : str, optional Temp directory prefix verbose : bool, optional Verbose """ with shm_or_tmp(required_gb, workdir, prefix=prefix, verbose=verbose) as tempdir: try: yield tempdir finally: if platform.system() == "Linux": drop_cache(tempdir)
[docs] def limit_threads(n_threads=-1): """ Limit number of threads usuage Parameters ---------- n_threads : int, optional Number of threads """ if n_threads > 0: os.environ["OMP_NUM_THREADS"] = str(n_threads) os.environ["OPENBLAS_NUM_THREADS"] = str(n_threads) os.environ["MKL_NUM_THREADS"] = str(n_threads) os.environ["VECLIB_MAXIMUM_THREADS"] = str(n_threads) os.environ["NUMEXPR_NUM_THREADS"] = str(n_threads) os.environ["RAYON_NUM_THREADS"] = str(n_threads)