from threading import Thread, Event, Lock from queue import Queue import importlib from src.functions import exit_thread import time class Email(Thread): """ Email module supervisor thread """ def __init__(self, configs, params, setup): super(Email, self).__init__() self.configs = configs self.params = params self.registry = {} self.workers = [] self.working = [] self.worker_status = [] self.add_event('stop') self.add_event('log_stop') self.setup(setup) def setup(self, data): for (m, n) in data['workers']: self.register_worker(m, n) for q in data['queues']: self.add_queue(q) for e in data['events']: self.add_event(e) for lo in data['locks']: self.add_lock(lo) def register_worker(self, module, name): if module not in self.registry: self.registry[module] = [] self.registry[module].append(name) def add_worker(self, worker): self.workers.append(worker) def start_worker(self, module, name): worker = getattr(module, name) worker = worker(configs=self.configs, params=self.params) self.add_worker(worker) self.working.append((module, name)) worker.start() self.worker_status.append(True) def start_workers(self): for module_name, workers in self.registry.items(): module = importlib.import_module(module_name) for name in workers: self.start_worker(module, name) def check_workers(self): stopped = 0 for (k, w) in enumerate(self.workers): if not self.worker_status[k]: stopped += 1 continue if not w.is_alive(): self.params['logging'].log('Worker {0} stopped'.format(type(w))) self.params['queues']['log'].put({'not': True, 'action': type(w)}) stopped += 1 self.worker_status[k] = False (m, n) = self.working[k] # Restart worker self.start_worker(m, n) if stopped == len(self.workers): return False return True def join_workers(self): [w.join(self.configs.get('supervisor.wait')) for w in self.workers if w.is_alive()] def add_queue(self, name): if 'queues' not in self.params or self.params['queues'] is None: self.params['queues'] = {} self.params['queues'][name] = Queue() def add_event(self, name): if 'events' not in self.params or self.params['events'] is None: self.params['events'] = {} self.params['events'][name] = Event() def add_lock(self, name): if 'locks' not in self.params or self.params['locks'] is None: self.params['locks'] = {} self.params['locks'][name] = Lock() def run(self) -> None: self.start_workers() worker = Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging'])) worker.start() while not self.params['events']['stop'].is_set(): if not self.check_workers(): break time.sleep(self.configs.get('supervisor.wait')) self.params['logging'].log('Waiting for workers', type(self)) self.params['events']['log_stop'].set() self.join_workers() worker.join()