Untitled

 avatar
unknown
plain_text
20 days ago
20 kB
5
Indexable
import subprocess
import selectors
import threading
import os
import signal
import time
from datetime import datetime
from pathlib import Path
import sys
import yaml
from concurrent.futures import ThreadPoolExecutor

# === LOAD CONFIG ===
with open("config.yaml", "r") as f:
    CONFIG = yaml.safe_load(f)

    SCRIPTS = CONFIG["scripts"]
    STOP_FOLDER = CONFIG["stop_folder"]
    POLL_INTERVAL = CONFIG["poll_interval"]
    LOG_DIR = CONFIG["log_dir"]

    os.makedirs(LOG_DIR, exist_ok=True)

    # === GLOBALS ===
    processes = {}
    stop_event = threading.Event()
    executor = ThreadPoolExecutor(max_workers=len(SCRIPTS) * 2)


    def timestamp():
        return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")


        def monitor_output(proc, script_name):
            sel = selectors.DefaultSelector()
                sel.register(proc.stdout, selectors.EVENT_READ)
                    sel.register(proc.stderr, selectors.EVENT_READ)

                        log_file_path = os.path.join(LOG_DIR, f"{script_name}.log")
                            with open(log_file_path, "a") as log_file:
                                    while True:
                                                for key, _ in sel.select():
                                                                line = key.fileobj.readline()
                                                                                if not line:
                                                                                                    return
                                                                                                                    line_decoded = line.decode().rstrip()
                                                                                                                                    level = "ERROR" if key.fileobj is proc.stderr else "INFO"
                                                                                                                                                    log_entry = f"{timestamp()} {level}: {line_decoded}"
                                                                                                                                                                    log_file.write(log_entry + "\n")
                                                                                                                                                                                    log_file.flush()


                                                                                                                                                                                    def run_script(script_name):
                                                                                                                                                                                        proc = subprocess.Popen(
                                                                                                                                                                                                    [sys.executable, script_name],
                                                                                                                                                                                                            stdout=subprocess.PIPE,
                                                                                                                                                                                                                    stderr=subprocess.PIPE,
                                                                                                                                                                                                                            bufsize=1,
                                                                                                                                                                                        )
                                                                                                                                                                                            processes[script_name] = proc
                                                                                                                                                                                                executor.submit(monitor_output, proc, script_name)


                                                                                                                                                                                                def check_stop_files():
                                                                                                                                                                                                    while not stop_event.is_set():
                                                                                                                                                                                                            if any(Path(STOP_FOLDER).glob("*")):
                                                                                                                                                                                                                        print(f"{timestamp()} INFO: Stop file detected.")
                                                                                                                                                                                                                                    stop_event.set()
                                                                                                                                                                                                                                                break
                                                                                                                                                                                                                                                        time.sleep(POLL_INTERVAL)


                                                                                                                                                                                                                                                        def handle_stop_signal(signum, frame):
                                                                                                                                                                                                                                                            print(f"{timestamp()} INFO: Stop signal ({signum}) received. Stopping...")
                                                                                                                                                                                                                                                                stop_event.set()


                                                                                                                                                                                                                                                                def stop_all_processes():
                                                                                                                                                                                                                                                                    for script_name, proc in processes.items():
                                                                                                                                                                                                                                                                            if proc.poll() is None:
                                                                                                                                                                                                                                                                                        print(f"{timestamp()} INFO: Terminating {script_name}")
                                                                                                                                                                                                                                                                                                    proc.terminate()
                                                                                                                                                                                                                                                                                                        time.sleep(3)
                                                                                                                                                                                                                                                                                                            for script_name, proc in processes.items():
                                                                                                                                                                                                                                                                                                                    if proc.poll() is None:
                                                                                                                                                                                                                                                                                                                                print(f"{timestamp()} INFO: Killing {script_name}")
                                                                                                                                                                                                                                                                                                                                            proc.kill()


                                                                                                                                                                                                                                                                                                                                            def main():
                                                                                                                                                                                                                                                                                                                                                signal.signal(signal.SIGTERM, handle_stop_signal)
                                                                                                                                                                                                                                                                                                                                                    signal.signal(signal.SIGINT, handle_stop_signal)

                                                                                                                                                                                                                                                                                                                                                        threading.Thread(target=check_stop_files, daemon=True).start()

                                                                                                                                                                                                                                                                                                                                                            for script in SCRIPTS:
                                                                                                                                                                                                                                                                                                                                                                    run_script(script)

                                                                                                                                                                                                                                                                                                                                                                        try:
                                                                                                                                                                                                                                                                                                                                                                                while not stop_event.is_set():
                                                                                                                                                                                                                                                                                                                                                                                            if all(proc.poll() is not None for proc in processes.values()):
                                                                                                                                                                                                                                                                                                                                                                                                            break
                                                                                                                                                                                                                                                                                                                                                                                                                        time.sleep(1)
                                                                                                                                                                                                                                                                                                                                                                                                                            finally:
                                                                                                                                                                                                                                                                                                                                                                                                                                    stop_all_processes()
                                                                                                                                                                                                                                                                                                                                                                                                                                            executor.shutdown(wait=True)

                                                                                                                                                                                                                                                                                                                                                                                                                                                error_found = False
                                                                                                                                                                                                                                                                                                                                                                                                                                                    for script_name, proc in processes.items():
                                                                                                                                                                                                                                                                                                                                                                                                                                                            return_code = proc.wait()
                                                                                                                                                                                                                                                                                                                                                                                                                                                                    print(f"{timestamp()} INFO: {script_name} exited with code {return_code}")
                                                                                                                                                                                                                                                                                                                                                                                                                                                                            if return_code != 0:
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        error_found = True

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            sys.exit(1 if error_found else 0)


                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            if __name__ == "__main__":
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                main()
                                                                                                                                                                                        )
Editor is loading...
Leave a Comment