Source code for paircars.utils.killjob_utils

import psutil
import numpy as np
import time
import os
import signal
import traceback
import subprocess
from distributed import Client
from .basic_utils import get_cachedir, check_port_status
from .resource_utils import drop_cache


[docs] def kill_port(port): """ Kill a running port Parameters ---------- port : int Port number """ if check_port_status(port) is False: print(f"Closing port : {port}.") result = subprocess.run( ["lsof", "-t", f"-i:{port}"], capture_output=True, text=True, ) for pid in result.stdout.split(): os.kill(int(pid), signal.SIGKILL)
[docs] def terminate_process_and_children(pid, grace_period=3.0): """ Try to gracefully terminate a process tree, then force kill if needed. """ try: parent = psutil.Process(pid) children = parent.children(recursive=True) for child in children: try: child.terminate() except Exception: pass parent.terminate() gone, alive = psutil.wait_procs([parent] + children, timeout=grace_period) for p in alive: try: p.kill() except Exception: pass except (psutil.NoSuchProcess, ProcessLookupError): pass
[docs] def kill_localscheduler(jobid): """ Kill local scheduler Parameters ---------- jobid : int P-AIRCARS job ID """ try: cachedir = get_cachedir() jobfile_name = f"{cachedir}/main_pids_{jobid}.txt" if os.path.exists(jobfile_name) is False: print(f"No P-AIRCARS job information available for job ID; {jobfile_name}") return try: results = np.loadtxt(jobfile_name, dtype="str", unpack=True) main_pid = int(results[1]) scheduler_address = str(results[2]) msdir = str(results[3]) workdir = str(results[4]) outdir = str(results[5]) except Exception: print("Could not read job file.") traceback.print_exc() return try: print("Closing dask cluster....") client = Client(address=scheduler_address, timeout=30) client.shutdown() client.close() except Exception: print(f"Dask cluster at: {scheduler_address} is already closed.") time.sleep(1) print(f"Attempting to terminate main PID: {main_pid}") terminate_process_and_children(main_pid) print("Dropping caches...") drop_cache(msdir) drop_cache(workdir) drop_cache(outdir) drop_cache(cachedir) print("Cleanup complete.") return except Exception: print(f"Error in killing P-AIRCARS job: {jobid}") traceback.print_exc() return
[docs] def kill_slurmscheduler(jobid): """ Kill local scheduler Parameters ---------- jobid : int P-AIRCARS job ID """ try: cachedir = get_cachedir() jobfile_name = f"{cachedir}/main_pids_{jobid}.txt" if os.path.exists(jobfile_name) is False: print(f"No P-AIRCARS job information available for job ID; {jobfile_name}") return try: results = np.loadtxt(jobfile_name, dtype="str", unpack=True) main_jobid = int(results[1]) scheduler_address = str(results[2]) msdir = str(results[3]) workdir = str(results[4]) outdir = str(results[5]) except Exception: print("Could not read job file.") traceback.print_exc() return try: print("Closing dask cluster....") client = Client(address=scheduler_address, timeout=30) client.shutdown() client.close() except Exception: print(f"Dask cluster at: {scheduler_address} is already closed.") time.sleep(1) print(f"Attempting to terminate main slurm jobid: {main_jobid}") subprocess.run(["scancel", f"{main_jobid}"]) print("Dropping caches...") drop_cache(msdir) drop_cache(workdir) drop_cache(outdir) drop_cache(cachedir) print("Cleanup complete.") return except Exception: print(f"Error in killing P-AIRCARS job: {jobid}") traceback.print_exc() return