Untitled
unknown
python
3 years ago
5.4 kB
14
Indexable
import argparse
import logging
import yaml
import time
import random
import multiprocessing as mp
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
# from orchestrator.typing.orchestrator import OrchestratorConfig
logger = logging.getLogger(__name__)
class FSEHandler(FileSystemEventHandler):
def __init__(self, queue: mp.Queue):
self.queue = queue
def on_created(self, event):
logging.info(f'EventType: {event.event_type}, IsDir: {event.is_directory}, SRC: {event.src_path}')
# Checking if the event is what we want:
if not event.is_directory and event.event_type == 'created':
self.queue.put(event)
def watch_path2(queue: mp.Queue, path: str):
logging.info(f'Starting the watch process: {queue}, path: {path}')
event_handler = FSEHandler(queue=queue)
observer = PollingObserver()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
logging.info('Inside run')
i = 0
while True:
logging.debug(f'Running.. {i}')
time.sleep(1)
i += 1
observer.join()
logging.info('Watcher joined')
finally:
observer.stop()
observer.join()
def event_handler(queue: mp.Queue, params: dict):
logging.debug(f'Passed values: {params}, object id: {id(params)}')
logging.info('Starting new task.')
# We enter an indefinite loop
while True:
# Wait for new events to happen
logging.info('Waiting for new task.')
event = queue.get()
# Do something, anything
if event is not None:
work_time = random.randint(4, 10)
logging.info(f'Started working on:{event} for {work_time}s')
time.sleep(work_time)
logging.info(f'Finish working on:{event}')
class Orchestrator:
def __init__(self, watch_path: str, nproc: int):
self.watch_path = watch_path
self.nproc = nproc
self.queue = mp.Queue()
self.watcher = None
self.workers = []
self.test_params = {}
def setup(self):
# watch_path = pathlib.Path(self.config.watch_directory)
# logging.debug(watch_path)
queue = mp.Queue()
logging.debug(self.watch_path)
self.watcher = mp.Process(target=watch_path2, name='Watcher', args=(queue, self.watch_path))
# self.watcher = mp.Process(target=watch_path2, args=(queue, 'E:\\code\\alchemy\\devenv\\vagrant\\orchestrator\\data_dir'))
# start the watcher process
# watch_path2(queue, self.config.watch_directory)
self.watcher.start()
for i in range(self.nproc):
worker_name = f'Worker{i+1}'
logging.info(f'Spawning process: {worker_name}')
# This is a random set of params sent to each handler. Use this to send models, variables to be loaded
# once.
params = {'name': worker_name, 'test_params': self.test_params}
p = mp.Process(target=self.handle_event, name=worker_name, args=(queue, params,))
# p = mp.Process(target=event_handler, name=worker_name, args=(queue, params,))
self.workers.append(p)
p.start()
def run(self):
""" This function ensures that everything keeps running.
We simply wait for the watcher and the workers to join (which should never happen).
"""
try:
logging.info('Inside run')
self.watcher.join()
logging.info('Watcher joined')
for p in self.workers:
p.join()
self.observer.join()
except KeyboardInterrupt:
# Handle the case when we want to stop the program. Gracefully send SIGTERM to individual processes,
# then everything shuts down.
logging.info("Caught KeyboardInterrupt, terminating all processes")
self.watcher.terminate()
self.watcher.join()
for p in self.workers:
p.terminate()
p.join()
def handle_event(self, queue, params: dict):
# We enter an indefinite loop
while True:
# Wait for new events to happen
event = queue.get()
# Do something, anything
if event is not None:
work_time = random.randint(4, 10)
logging.info(f'Started working on:{event} for {work_time}s')
time.sleep(work_time)
logging.info(f'Finish working on:{event}')
def main():
logging.basicConfig(level='DEBUG', format='%(asctime)s - %(name)s - %(lineno)d - %(processName)s - %(levelname)s - %(message)s')
path = "/home/vagrant/orchestrator/data_dir"
orchestrator = Orchestrator(watch_path=path, nproc=3)
orchestrator.setup()
orchestrator.run()
if __name__ == '__main__':
main()Editor is loading...