Untitled
unknown
plain_text
20 days ago
20 kB
5
Indexable
import subprocess import selectors import threading import os import signal import time from datetime import datetime from pathlib import Path import sys import yaml from concurrent.futures import ThreadPoolExecutor # === LOAD CONFIG === with open("config.yaml", "r") as f: CONFIG = yaml.safe_load(f) SCRIPTS = CONFIG["scripts"] STOP_FOLDER = CONFIG["stop_folder"] POLL_INTERVAL = CONFIG["poll_interval"] LOG_DIR = CONFIG["log_dir"] os.makedirs(LOG_DIR, exist_ok=True) # === GLOBALS === processes = {} stop_event = threading.Event() executor = ThreadPoolExecutor(max_workers=len(SCRIPTS) * 2) def timestamp(): return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") def monitor_output(proc, script_name): sel = selectors.DefaultSelector() sel.register(proc.stdout, selectors.EVENT_READ) sel.register(proc.stderr, selectors.EVENT_READ) log_file_path = os.path.join(LOG_DIR, f"{script_name}.log") with open(log_file_path, "a") as log_file: while True: for key, _ in sel.select(): line = key.fileobj.readline() if not line: return line_decoded = line.decode().rstrip() level = "ERROR" if key.fileobj is proc.stderr else "INFO" log_entry = f"{timestamp()} {level}: {line_decoded}" log_file.write(log_entry + "\n") log_file.flush() def run_script(script_name): proc = subprocess.Popen( [sys.executable, script_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, ) processes[script_name] = proc executor.submit(monitor_output, proc, script_name) def check_stop_files(): while not stop_event.is_set(): if any(Path(STOP_FOLDER).glob("*")): print(f"{timestamp()} INFO: Stop file detected.") stop_event.set() break time.sleep(POLL_INTERVAL) def handle_stop_signal(signum, frame): print(f"{timestamp()} INFO: Stop signal ({signum}) received. Stopping...") stop_event.set() def stop_all_processes(): for script_name, proc in processes.items(): if proc.poll() is None: print(f"{timestamp()} INFO: Terminating {script_name}") proc.terminate() time.sleep(3) for script_name, proc in processes.items(): if proc.poll() is None: print(f"{timestamp()} INFO: Killing {script_name}") proc.kill() def main(): signal.signal(signal.SIGTERM, handle_stop_signal) signal.signal(signal.SIGINT, handle_stop_signal) threading.Thread(target=check_stop_files, daemon=True).start() for script in SCRIPTS: run_script(script) try: while not stop_event.is_set(): if all(proc.poll() is not None for proc in processes.values()): break time.sleep(1) finally: stop_all_processes() executor.shutdown(wait=True) error_found = False for script_name, proc in processes.items(): return_code = proc.wait() print(f"{timestamp()} INFO: {script_name} exited with code {return_code}") if return_code != 0: error_found = True sys.exit(1 if error_found else 0) if __name__ == "__main__": main() )
Editor is loading...
Leave a Comment