Source code for paircars.utils.prefect_setup_utils

import os
import subprocess
import time
import socket
import signal
import toml
import traceback
import numpy as np
from dotenv import load_dotenv
from .basic_utils import get_cachedir, get_datadir
from .killjob_utils import kill_port
from .udocker_utils import run_postgres, kill_postgres


# === CONFIG ===
[docs] def prefect_config(port, postgres_port, scheduler_name="local"): """ Configure prefect Parameters ---------- port : int Free port postgres_port : int Postgres server port scheduler_name : str, optional Scheduler name Returns ------- str Configuration file name dict Configuration dictionary """ ################################## # Postgres information ################################## datadir = get_datadir() postgres_url_file = f"{datadir}/postgres.url" if os.path.exists(postgres_url_file) is False: print("Start postgres server first.") return ################################### # Prefect configuration ################################### cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" PREFECT_HOME = f"{cachedir}/prefect_home" os.makedirs(PREFECT_HOME, exist_ok=True) with open(postgres_url_file, "r") as f: postgres_url = f.read().strip() PREFECT_API_DATABASE_CONNECTION_URL = postgres_url LOG_FILE = os.path.join(PREFECT_HOME, "server.log") profile_path = os.path.join(PREFECT_HOME, "profiles.toml") memo_path = os.path.join(PREFECT_HOME, "memo_store.toml") storage = os.path.join(PREFECT_HOME, "storage") os.makedirs(storage, exist_ok=True) ENV_FILE = os.path.join(cachedir, "paircars_prefect.env") SERVER_HOST = "0.0.0.0" SERVER_PORT = f"{port}" hostname = socket.gethostname() SERVER_URL = f"http://0.0.0.0:{SERVER_PORT}/api" NODE_URL = f"http://{hostname}:{SERVER_PORT}/api" SERVER_DASHBOARD = f"http://localhost:{SERVER_PORT}/dashboard" PREFECT_SERVER_API_HOST = "127.0.0.1" profile_name = f"paircarspipe_{scheduler_name}" pid_file = os.path.join(PREFECT_HOME, "server.pid") logging_path = os.path.join(PREFECT_HOME, "logging.yml") config = { "CACHEDIR": cachedir, "PREFECT_HOME": PREFECT_HOME, "PREFECT_API_DATABASE_CONNECTION_URL": PREFECT_API_DATABASE_CONNECTION_URL, "LOG_FILE": LOG_FILE, "PROFILE_PATH": profile_path, "MEMO_PATH": memo_path, "STORAGE": storage, "ENV_FILE": ENV_FILE, "SERVER_HOST": SERVER_HOST, "SERVER_PORT": SERVER_PORT, "SERVER_URL": SERVER_URL, "NODE_URL": NODE_URL, "SERVER_DASHBOARD": SERVER_DASHBOARD, "PROFILE_NAME": profile_name, "PID_FILE": pid_file, "LOGGING_PATH": logging_path, "PREFECT_SERVER_API_HOST": PREFECT_SERVER_API_HOST, } np.save(config_file, config) return config_file, config
#################################### # Start and save ####################################
[docs] def write_prefect_profile(scheduler_name="local"): """ Save prefect profile Parameters ---------- scheduler_name : str, optional Scheduler name Returns ------- str Profile file """ cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" if os.path.exists(config_file) is False: print(f"Configuration file for job ID: {scheduler_name} does not exist.") return False config = np.load(config_file, allow_pickle=True).all() # Load existing TOML config or start new profile_path = config["PROFILE_PATH"] if os.path.exists(profile_path): data = toml.load(profile_path) else: data = {} # Set active profile profile_name = config["PROFILE_NAME"] data["active"] = profile_name # Set config under [profiles.<profile_name>] if "profiles" not in data: data["profiles"] = {} data["profiles"][profile_name] = { "PREFECT_API_URL": config["SERVER_URL"], "PREFECT_HOME": config["PREFECT_HOME"], "PREFECT_API_DATABASE_CONNECTION_URL": config[ "PREFECT_API_DATABASE_CONNECTION_URL" ], } with open(profile_path, "w") as f: toml.dump(data, f) print(f"Prefect profile '{profile_name}' written to {profile_path}") return profile_path
[docs] def save_prefect_env_to_file(scheduler_name="local"): """ Save current Prefect server env config to a .env file for reuse. Parameters ---------- scheduler_name : str, optional Scheduler name Returns ------- str Profile file str Environment file str Dashboard file """ cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" if os.path.exists(config_file) is False: print(f"Configuration file for job ID: {scheduler_name} does not exist.") return False config = np.load(config_file, allow_pickle=True).all() cachedir = config["CACHEDIR"] env_file = config["ENV_FILE"] dashboard = f"{cachedir}/prefect.dashboard" with open(env_file, "w") as f: f.write(f"PREFECT_HOME={config['PREFECT_HOME']}\n") f.write("PREFECT_API_MODE=server\n") f.write( f"PREFECT_API_DATABASE_CONNECTION_URL={config['PREFECT_API_DATABASE_CONNECTION_URL']}\n" ) f.write("PREFECT_SERVER_ALLOW_EPHEMERAL_MODE=false\n") f.write(f"PREFECT_API_URL={config['SERVER_URL']}\n") f.write(f"PREFECT_PROFILE={config['PROFILE_NAME']}\n") f.write(f"PREFECT_PROFILES_PATH={config['PROFILE_PATH']}\n") f.write(f"PREFECT_LOCAL_STORAGE_PATH={config['STORAGE']}\n") f.write(f"PREFECT_LOGGING_SETTINGS_PATH={config['LOGGING_PATH']}\n") f.write(f"PREFECT_MEMO_STORE_PATH={config['MEMO_PATH']}\n") print(f"Saved Prefect server environment to {env_file}") if os.path.exists(dashboard) is not True: with open(dashboard, "w") as f: f.write(f"{config['SERVER_DASHBOARD']}") profile_path = write_prefect_profile(scheduler_name=scheduler_name) return profile_path, env_file, dashboard
[docs] def start_prefect_server( port, postgres_port, show_config=False, scheduler_name="local" ): """ Start prefect server if it is not running Parameters ---------- port : int Free port number postgres_port : int Postgres port show_config : bool, optional Show configuration of prefect server scheduler_name : str, optional Scheduler name Returns ------- 0, config_file, profile_path, env_file, dashboard, pid_file int Success message str Configuration file str Profile file str Environment file str Dashboard file str Server process ID file """ kill_port(int(port)) trial = 0 while trial < 5: running = run_postgres( postgres_port=postgres_port, verbose=True, ) if running: break else: trial += 1 config_file, config = prefect_config( port, postgres_port, scheduler_name=scheduler_name ) pid_file = config["PID_FILE"] env = get_prefect_env(scheduler_name=scheduler_name) print("Starting Prefect server...") if prefect_server_status(scheduler_name=scheduler_name): print(f"Server is already running at port: {port}") server_started = True new_start = False else: with open(config["LOG_FILE"], "w") as f: server_proc = subprocess.Popen( [ "prefect", "server", "start", "--host", config["SERVER_HOST"], "--port", config["SERVER_PORT"], "--log-level", "INFO", ], stdout=f, stderr=subprocess.STDOUT, env=env, ) server_started = False new_start = True os.makedirs(config["PREFECT_HOME"], exist_ok=True) profile_path, env_file, dashboard = save_prefect_env_to_file( scheduler_name=scheduler_name ) if server_started is False: for _ in range(180): # wait up to 180s for the server to respond if prefect_server_status(scheduler_name=scheduler_name): server_started = True break else: time.sleep(5) if server_started: if new_start: with open(pid_file, "w") as pf: pf.write(str(server_proc.pid)) print( "##########################################################################" ) if scheduler_name != "local": print( f"First tunnel to prefect from your local machine: ssh -N -L {port}:localhost:{port} <username>@<remote.cluster.name>" ) print( f"Prefect server dashboard for monitoring is available at local machine: http://localhost:{port}/dashboard" ) print( "##########################################################################" ) if os.path.exists(dashboard) is not True: with open(dashboard, "w") as f: f.write(f"{config['SERVER_DASHBOARD']}") if show_config: show_prefect_config(scheduler_name=scheduler_name) return 0, config_file, profile_path, env_file, dashboard, pid_file else: print( f"Server did not respond within 3 minutes. Check logs at {config['LOG_FILE']} for more details" ) return 0, config_file, profile_path, env_file, dashboard, pid_file
######################################### # Stop prefect server ##########################################
[docs] def stop_prefect_server(scheduler_name="local"): """ Stop prefect server running in the current installation Note: it will only stop prefect server which is running from the current installation For this pipeline, a port between 4260 to 5250 is chosen for P-AIRCARS. Parameters ---------- scheduler_name : str, optional Scheduler name Returns ------- int Success message """ cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" if os.path.exists(config_file) is False: print(f"Configuration file for job ID: {scheduler_name} does not exist.") os.system(f"rm -rf {cachedir}") return 1 config = np.load(config_file, allow_pickle=True).all() config = np.load(config_file, allow_pickle=True).all() load_dotenv(dotenv_path=config["ENV_FILE"], override=True) env = os.environ.copy() print("Prefect config in current environment ...") result = subprocess.run( ["prefect", "server", "stop"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) exit_code = result.returncode if exit_code == 0: print("Prefect server stopped.") else: print("Could not stop prefect server.") postgres_port = int( config["PREFECT_API_DATABASE_CONNECTION_URL"].split(":")[-1].split("/")[0] ) pid_file = config["PID_FILE"] cachedir = config["CACHEDIR"] try: if os.path.exists(pid_file): with open(pid_file, "r") as f: pid = int(f.read().strip()) print(f"Stopping Prefect server with PID {pid} ...") os.kill(pid, signal.SIGTERM) try: print(f"Closing prefect server port: {config['SERVER_PORT']}") kill_port(int(config["SERVER_PORT"])) msg = 0 print(f"Closed port: {config['SERVER_PORT']}") except Exception: traceback.print_exc() msg = 1 print(f"Could not close prefect server port: {config['SERVER_PORT']}") try: print(f"Closing postgreSQL server port: {postgres_port}") killed = kill_postgres( postgres_port=postgres_port, ) if not killed: kill_port(int(postgres_port)) print(f"Closed port: {postgres_port}") except Exception: traceback.print_exc() print(f"Could not close postgreSQL server port: {postgres_port}") print(f"Server stopped and {cachedir} removed.") msg = 0 except ProcessLookupError: print(f"No such process with PID {pid}. Removing stale {cachedir} directory.") msg = 0 except Exception: print("Error stopping server") traceback.print_exc() msg = 1 finally: os.system(f"rm -rf {cachedir}") return msg
############################################ # Prefect server status ############################################
[docs] def prefect_server_status(scheduler_name="local"): """ Get prefect server status Parameters ---------- scheduler_name : str, optional Scheduler name """ cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" if os.path.exists(config_file) is False: print(f"Configuration file for job ID: {scheduler_name} does not exist.") return False config = np.load(config_file, allow_pickle=True).all() try: with socket.create_connection( (config["SERVER_HOST"], config["SERVER_PORT"]), timeout=60 ): return True except OSError: return False
[docs] def get_prefect_env(scheduler_name="local"): """ Get environment variables of prefect Parameters ---------- scheduler_name : str, optional Scheduler name Returns ------- dict Environment dictionary """ cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" if os.path.exists(config_file) is False: print(f"Configuration file for job ID: {scheduler_name} does not exist.") return config = np.load(config_file, allow_pickle=True).all() env = os.environ.copy() env["PREFECT_HOME"] = config["PREFECT_HOME"] env["PREFECT_API_MODE"] = "server" env["PREFECT_API_DATABASE_CONNECTION_URL"] = config[ "PREFECT_API_DATABASE_CONNECTION_URL" ] env["PREFECT_SERVER_ALLOW_EPHEMERAL_MODE"] = "false" env["PREFECT_API_URL"] = config["SERVER_URL"] env["PREFECT_PROFILE"] = config["PROFILE_NAME"] env["PREFECT_PROFILES_PATH"] = config["PROFILE_PATH"] env["PREFECT_LOCAL_STORAGE_PATH"] = config["STORAGE"] env["PREFECT_LOGGING_SETTINGS_PATH"] = config["LOGGING_PATH"] env["PREFECT_MEMO_STORE_PATH"] = config["MEMO_PATH"] return env
[docs] def show_prefect_config(scheduler_name="local"): """ Print the effective Prefect config in this environment. Parameters ---------- scheduler_name : str, optional Scheduler name """ cachedir = f"{get_cachedir()}/prefect_{scheduler_name}" os.makedirs(cachedir, exist_ok=True) config_file = f"{cachedir}/prefect.config.npy" if os.path.exists(config_file) is False: print(f"Configuration file for job ID: {scheduler_name} does not exist.") return config = np.load(config_file, allow_pickle=True).all() load_dotenv(dotenv_path=config["ENV_FILE"], override=True) env = os.environ.copy() print("Prefect config in current environment ...") subprocess.run(["prefect", "config", "view"], env=env)