diff --git a/src/supervisor.py b/src/supervisor.py new file mode 100644 index 0000000..3ee7797 --- /dev/null +++ b/src/supervisor.py @@ -0,0 +1,93 @@ +from threading import Thread, Event, Lock +from queue import Queue +import keyboard +import time +import importlib + + +class Supervisor(Thread): + def __init__(self, configs, params): + super().__init__() + self._configs = configs + self.params = params + self._workers = [] + self._status = [] + self._registry = {} + self.add_event('stop') + + def add_worker(self, worker): + self._workers.append(worker) + + def start_workers(self): + [w.start() for w in self._workers] + [self._status.append(True) for w in self._workers] + + def check_workers(self): + stopped = 0 + for (k, w) in enumerate(self._workers): + if not self._status[k]: + stopped += 1 + continue + if not w.is_alive(): + self.params['logging'].log('Worker {0} stopped'.format(type(w))) + self.params['queues']['log'].put({'not': True, 'action': type(w)}) + stopped += 1 + self._status[k] = False + if stopped == len(self._workers): + return False + return True + + def join_workers(self): + [w.join(self._configs.get('supervisor.wait')) for w in self._workers] + + def add_queue(self, name): + if 'queues' not in self.params or self.params['queues'] is None: + self.params['queues'] = {} + self.params['queues'][name] = Queue() + + def add_event(self, name): + if 'events' not in self.params or self.params['events'] is None: + self.params['events'] = {} + self.params['events'][name] = Event() + + def add_lock(self, name): + if 'locks' not in self.params or self.params['locks'] is None: + self.params['locks'] = {} + self.params['locks'][name] = Lock() + + def register_worker(self, module, name): + if module not in self._registry: + self._registry[module] = [] + self._registry[module].append(name) + + def run(self): + for (module_name, ws) in self._registry.items(): + module = importlib.import_module(module_name) + for w in ws: + worker = getattr(module, w) + self.add_worker(worker(configs=self._configs, params=self.params)) + + self.add_worker(Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging']))) + + self.params['queues']['log'].put({'is_start': True}) + self.start_workers() + while not self.params['events']['stop'].is_set(): + self.params['logging'].log('Looping main status {0}'.format(not self.params['events']['stop'].is_set()), + caller=type(self)) + if not self.check_workers(): + break + self.params['logging'].log('Waiting {0} secs'.format(self._configs.get('supervisor.wait')), + caller=type(self)) + time.sleep(self._configs.get('supervisor.wait')) + self.params['logging'].log('Exiting', caller=type(self)) + self.join_workers() + self.params['logging'].log('Waiting for Workers', caller=type(self)) + return + + +def exit_thread(event, logging): + logging.log('Starting exit thread', caller='exit_thread') + keyboard.wait('Esc') + logging.log('Escape pressed', caller='exit_thread') + event.set() + logging.log('Exit signal sent', caller='exit_thread')