Untitled
unknown
python
2 years ago
5.4 kB
3
Indexable
import argparse import logging import yaml import time import random import multiprocessing as mp from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver from watchdog.events import LoggingEventHandler, FileSystemEventHandler # from orchestrator.typing.orchestrator import OrchestratorConfig logger = logging.getLogger(__name__) class FSEHandler(FileSystemEventHandler): def __init__(self, queue: mp.Queue): self.queue = queue def on_created(self, event): logging.info(f'EventType: {event.event_type}, IsDir: {event.is_directory}, SRC: {event.src_path}') # Checking if the event is what we want: if not event.is_directory and event.event_type == 'created': self.queue.put(event) def watch_path2(queue: mp.Queue, path: str): logging.info(f'Starting the watch process: {queue}, path: {path}') event_handler = FSEHandler(queue=queue) observer = PollingObserver() observer.schedule(event_handler, path, recursive=True) observer.start() try: logging.info('Inside run') i = 0 while True: logging.debug(f'Running.. {i}') time.sleep(1) i += 1 observer.join() logging.info('Watcher joined') finally: observer.stop() observer.join() def event_handler(queue: mp.Queue, params: dict): logging.debug(f'Passed values: {params}, object id: {id(params)}') logging.info('Starting new task.') # We enter an indefinite loop while True: # Wait for new events to happen logging.info('Waiting for new task.') event = queue.get() # Do something, anything if event is not None: work_time = random.randint(4, 10) logging.info(f'Started working on:{event} for {work_time}s') time.sleep(work_time) logging.info(f'Finish working on:{event}') class Orchestrator: def __init__(self, watch_path: str, nproc: int): self.watch_path = watch_path self.nproc = nproc self.queue = mp.Queue() self.watcher = None self.workers = [] self.test_params = {} def setup(self): # watch_path = pathlib.Path(self.config.watch_directory) # logging.debug(watch_path) queue = mp.Queue() logging.debug(self.watch_path) self.watcher = mp.Process(target=watch_path2, name='Watcher', args=(queue, self.watch_path)) # self.watcher = mp.Process(target=watch_path2, args=(queue, 'E:\\code\\alchemy\\devenv\\vagrant\\orchestrator\\data_dir')) # start the watcher process # watch_path2(queue, self.config.watch_directory) self.watcher.start() for i in range(self.nproc): worker_name = f'Worker{i+1}' logging.info(f'Spawning process: {worker_name}') # This is a random set of params sent to each handler. Use this to send models, variables to be loaded # once. params = {'name': worker_name, 'test_params': self.test_params} p = mp.Process(target=self.handle_event, name=worker_name, args=(queue, params,)) # p = mp.Process(target=event_handler, name=worker_name, args=(queue, params,)) self.workers.append(p) p.start() def run(self): """ This function ensures that everything keeps running. We simply wait for the watcher and the workers to join (which should never happen). """ try: logging.info('Inside run') self.watcher.join() logging.info('Watcher joined') for p in self.workers: p.join() self.observer.join() except KeyboardInterrupt: # Handle the case when we want to stop the program. Gracefully send SIGTERM to individual processes, # then everything shuts down. logging.info("Caught KeyboardInterrupt, terminating all processes") self.watcher.terminate() self.watcher.join() for p in self.workers: p.terminate() p.join() def handle_event(self, queue, params: dict): # We enter an indefinite loop while True: # Wait for new events to happen event = queue.get() # Do something, anything if event is not None: work_time = random.randint(4, 10) logging.info(f'Started working on:{event} for {work_time}s') time.sleep(work_time) logging.info(f'Finish working on:{event}') def main(): logging.basicConfig(level='DEBUG', format='%(asctime)s - %(name)s - %(lineno)d - %(processName)s - %(levelname)s - %(message)s') path = "/home/vagrant/orchestrator/data_dir" orchestrator = Orchestrator(watch_path=path, nproc=3) orchestrator.setup() orchestrator.run() if __name__ == '__main__': main()
Editor is loading...