Untitled
unknown
plain_text
8 months ago
20 kB
6
Indexable
import subprocess
import selectors
import threading
import os
import signal
import time
from datetime import datetime
from pathlib import Path
import sys
import yaml
from concurrent.futures import ThreadPoolExecutor
# === LOAD CONFIG ===
with open("config.yaml", "r") as f:
CONFIG = yaml.safe_load(f)
SCRIPTS = CONFIG["scripts"]
STOP_FOLDER = CONFIG["stop_folder"]
POLL_INTERVAL = CONFIG["poll_interval"]
LOG_DIR = CONFIG["log_dir"]
os.makedirs(LOG_DIR, exist_ok=True)
# === GLOBALS ===
processes = {}
stop_event = threading.Event()
executor = ThreadPoolExecutor(max_workers=len(SCRIPTS) * 2)
def timestamp():
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
def monitor_output(proc, script_name):
sel = selectors.DefaultSelector()
sel.register(proc.stdout, selectors.EVENT_READ)
sel.register(proc.stderr, selectors.EVENT_READ)
log_file_path = os.path.join(LOG_DIR, f"{script_name}.log")
with open(log_file_path, "a") as log_file:
while True:
for key, _ in sel.select():
line = key.fileobj.readline()
if not line:
return
line_decoded = line.decode().rstrip()
level = "ERROR" if key.fileobj is proc.stderr else "INFO"
log_entry = f"{timestamp()} {level}: {line_decoded}"
log_file.write(log_entry + "\n")
log_file.flush()
def run_script(script_name):
proc = subprocess.Popen(
[sys.executable, script_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
)
processes[script_name] = proc
executor.submit(monitor_output, proc, script_name)
def check_stop_files():
while not stop_event.is_set():
if any(Path(STOP_FOLDER).glob("*")):
print(f"{timestamp()} INFO: Stop file detected.")
stop_event.set()
break
time.sleep(POLL_INTERVAL)
def handle_stop_signal(signum, frame):
print(f"{timestamp()} INFO: Stop signal ({signum}) received. Stopping...")
stop_event.set()
def stop_all_processes():
for script_name, proc in processes.items():
if proc.poll() is None:
print(f"{timestamp()} INFO: Terminating {script_name}")
proc.terminate()
time.sleep(3)
for script_name, proc in processes.items():
if proc.poll() is None:
print(f"{timestamp()} INFO: Killing {script_name}")
proc.kill()
def main():
signal.signal(signal.SIGTERM, handle_stop_signal)
signal.signal(signal.SIGINT, handle_stop_signal)
threading.Thread(target=check_stop_files, daemon=True).start()
for script in SCRIPTS:
run_script(script)
try:
while not stop_event.is_set():
if all(proc.poll() is not None for proc in processes.values()):
break
time.sleep(1)
finally:
stop_all_processes()
executor.shutdown(wait=True)
error_found = False
for script_name, proc in processes.items():
return_code = proc.wait()
print(f"{timestamp()} INFO: {script_name} exited with code {return_code}")
if return_code != 0:
error_found = True
sys.exit(1 if error_found else 0)
if __name__ == "__main__":
main()
)Editor is loading...
Leave a Comment