Untitled
unknown
python
3 years ago
5.4 kB
6
Indexable
import argparse
import logging
import yaml
import time
import random
import multiprocessing as mp
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
# from orchestrator.typing.orchestrator import OrchestratorConfig
logger = logging.getLogger(__name__)
class FSEHandler(FileSystemEventHandler):
def __init__(self, queue: mp.Queue):
self.queue = queue
def on_created(self, event):
logging.info(f'EventType: {event.event_type}, IsDir: {event.is_directory}, SRC: {event.src_path}')
# Checking if the event is what we want:
if not event.is_directory and event.event_type == 'created':
self.queue.put(event)
def watch_path2(queue: mp.Queue, path: str):
logging.info(f'Starting the watch process: {queue}, path: {path}')
event_handler = FSEHandler(queue=queue)
observer = PollingObserver()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
logging.info('Inside run')
i = 0
while True:
logging.debug(f'Running.. {i}')
time.sleep(1)
i += 1
observer.join()
logging.info('Watcher joined')
finally:
observer.stop()
observer.join()
def event_handler(queue: mp.Queue, params: dict):
logging.debug(f'Passed values: {params}, object id: {id(params)}')
logging.info('Starting new task.')
# We enter an indefinite loop
while True:
# Wait for new events to happen
logging.info('Waiting for new task.')
event = queue.get()
# Do something, anything
if event is not None:
work_time = random.randint(4, 10)
logging.info(f'Started working on:{event} for {work_time}s')
time.sleep(work_time)
logging.info(f'Finish working on:{event}')
class Orchestrator:
def __init__(self, watch_path: str, nproc: int):
self.watch_path = watch_path
self.nproc = nproc
self.queue = mp.Queue()
self.watcher = None
self.workers = []
self.test_params = {}
def setup(self):
# watch_path = pathlib.Path(self.config.watch_directory)
# logging.debug(watch_path)
queue = mp.Queue()
logging.debug(self.watch_path)
self.watcher = mp.Process(target=watch_path2, name='Watcher', args=(queue, self.watch_path))
# self.watcher = mp.Process(target=watch_path2, args=(queue, 'E:\\code\\alchemy\\devenv\\vagrant\\orchestrator\\data_dir'))
# start the watcher process
# watch_path2(queue, self.config.watch_directory)
self.watcher.start()
for i in range(self.nproc):
worker_name = f'Worker{i+1}'
logging.info(f'Spawning process: {worker_name}')
# This is a random set of params sent to each handler. Use this to send models, variables to be loaded
# once.
params = {'name': worker_name, 'test_params': self.test_params}
p = mp.Process(target=self.handle_event, name=worker_name, args=(queue, params,))
# p = mp.Process(target=event_handler, name=worker_name, args=(queue, params,))
self.workers.append(p)
p.start()
def run(self):
""" This function ensures that everything keeps running.
We simply wait for the watcher and the workers to join (which should never happen).
"""
try:
logging.info('Inside run')
self.watcher.join()
logging.info('Watcher joined')
for p in self.workers:
p.join()
self.observer.join()
except KeyboardInterrupt:
# Handle the case when we want to stop the program. Gracefully send SIGTERM to individual processes,
# then everything shuts down.
logging.info("Caught KeyboardInterrupt, terminating all processes")
self.watcher.terminate()
self.watcher.join()
for p in self.workers:
p.terminate()
p.join()
def handle_event(self, queue, params: dict):
# We enter an indefinite loop
while True:
# Wait for new events to happen
event = queue.get()
# Do something, anything
if event is not None:
work_time = random.randint(4, 10)
logging.info(f'Started working on:{event} for {work_time}s')
time.sleep(work_time)
logging.info(f'Finish working on:{event}')
def main():
logging.basicConfig(level='DEBUG', format='%(asctime)s - %(name)s - %(lineno)d - %(processName)s - %(levelname)s - %(message)s')
path = "/home/vagrant/orchestrator/data_dir"
orchestrator = Orchestrator(watch_path=path, nproc=3)
orchestrator.setup()
orchestrator.run()
if __name__ == '__main__':
main()Editor is loading...