Untitled

mail@pastecode.io avatar
unknown
python
a year ago
5.4 kB
1
Indexable
Never
import argparse
import logging
import yaml
import time
import random
import multiprocessing as mp
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import LoggingEventHandler, FileSystemEventHandler
# from orchestrator.typing.orchestrator import OrchestratorConfig

logger = logging.getLogger(__name__)


class FSEHandler(FileSystemEventHandler):
    def __init__(self, queue: mp.Queue):
        self.queue = queue
            
    def on_created(self, event):
        logging.info(f'EventType: {event.event_type}, IsDir: {event.is_directory}, SRC: {event.src_path}')
        
        # Checking if the event is what we want:
        if not event.is_directory and event.event_type == 'created':
            self.queue.put(event)
       

def watch_path2(queue: mp.Queue, path: str):
    logging.info(f'Starting the watch process: {queue}, path: {path}')
    event_handler = FSEHandler(queue=queue)

    observer = PollingObserver()
    observer.schedule(event_handler, path, recursive=True)

    observer.start()
    try:
        logging.info('Inside run')
        i = 0
        while True:
            logging.debug(f'Running.. {i}')
            time.sleep(1)
            i += 1

        observer.join()
        logging.info('Watcher joined')
    finally:
        observer.stop()
        observer.join()    

def event_handler(queue: mp.Queue, params: dict):
    logging.debug(f'Passed values: {params}, object id: {id(params)}')
    logging.info('Starting new task.')
    
    # We enter an indefinite loop
    while True:
        # Wait for new events to happen
        logging.info('Waiting for new task.')
        event = queue.get()
        
        # Do something, anything
        if event is not None:
            work_time = random.randint(4, 10)
            logging.info(f'Started working on:{event} for {work_time}s')
            time.sleep(work_time)
            logging.info(f'Finish  working on:{event}')


class Orchestrator:
    def __init__(self, watch_path: str, nproc: int):
        self.watch_path = watch_path
        self.nproc = nproc

        self.queue = mp.Queue()
        self.watcher = None
        self.workers = []
        self.test_params = {}
                
    def setup(self):
        # watch_path = pathlib.Path(self.config.watch_directory)
        # logging.debug(watch_path)
        queue = mp.Queue()
        logging.debug(self.watch_path)
        self.watcher = mp.Process(target=watch_path2, name='Watcher', args=(queue, self.watch_path))
        # self.watcher = mp.Process(target=watch_path2, args=(queue, 'E:\\code\\alchemy\\devenv\\vagrant\\orchestrator\\data_dir'))
    
        # start the watcher process
        
        # watch_path2(queue, self.config.watch_directory)
        self.watcher.start()

        for i in range(self.nproc):
            worker_name = f'Worker{i+1}'
            logging.info(f'Spawning process: {worker_name}')
            # This is a random set of params sent to each handler. Use this to send models, variables to be loaded 
            # once. 
            
            params = {'name': worker_name, 'test_params': self.test_params}
            p = mp.Process(target=self.handle_event, name=worker_name, args=(queue, params,))
            # p = mp.Process(target=event_handler, name=worker_name, args=(queue, params,))
            
            self.workers.append(p)
            p.start()
        
    def run(self):
        """ This function ensures that everything keeps running.
            We simply wait for the watcher and the workers to join (which should never happen).
        """
        try:
            logging.info('Inside run')
            self.watcher.join()
            
            logging.info('Watcher joined')
            for p in self.workers:
                p.join()
            
            self.observer.join()
                
        except KeyboardInterrupt:
            # Handle the case when we want to stop the program. Gracefully send SIGTERM to individual processes, 
            # then everything shuts down.
            logging.info("Caught KeyboardInterrupt, terminating all processes")
            self.watcher.terminate()
            self.watcher.join()

            for p in self.workers:
                p.terminate()
                p.join()
            
        
    def handle_event(self, queue, params: dict):        
        # We enter an indefinite loop
        while True:
            # Wait for new events to happen
            event = queue.get()
            
            # Do something, anything
            if event is not None:
                work_time = random.randint(4, 10)
                logging.info(f'Started working on:{event} for {work_time}s')
                time.sleep(work_time)
                logging.info(f'Finish  working on:{event}')
                

def main():
    logging.basicConfig(level='DEBUG', format='%(asctime)s - %(name)s - %(lineno)d - %(processName)s - %(levelname)s - %(message)s')
    
    path = "/home/vagrant/orchestrator/data_dir"
    orchestrator = Orchestrator(watch_path=path, nproc=3)
    orchestrator.setup()
    orchestrator.run()
    

if __name__ == '__main__':
    main()