from multiprocessing import Process from queue import Queue from threading import Thread, Event, Lock import argparse import os from common.helper.logging import Logging from setup.config import load_config import pytz from src.bosses import Bosses import keyboard import time class Email(Process): def __init__(self, configs, params, setup): super(Email, self).__init__() self.configs = configs self.params = params self.registry = {} self.workers = [] self.working = [] self.worker_status = [] self.add_event('stop') def setup(self, data): for (m, n) in data['workers']: self.register_worker(m, n) for q in data['queues']: self.add_queue(q) for e in data['events']: self.add_event(e) for l in data['locks']: self.add_lock(l) def register_worker(self, module, name): if module not in self.registry: self.registry[module] = [] self.registry[module].append(name) def add_worker(self, worker): self.workers.append(worker) def start_worker(self, module, name): worker = getattr(module, name) self.add_worker(worker) self.working.append((module, name)) worker.start() self.worker_status.append(True) def start_workers(self): for module, workers in self.registry: for name in workers: self.start_worker(module, name) def check_workers(self): stopped = 0 for (k, w) in enumerate(self.workers): if not self.worker_status[k]: stopped += 1 continue if not w.is_alive(): self.params['logging'].log('Worker {0} stopped'.format(type(w))) self.params['queues']['log'].put({'not': True, 'action': type(w)}) stopped += 1 self.worker_status[k] = False (m, n) = self.working[k] # Restart worker self.start_worker(m, n) if stopped == len(self.workers): return False return True def join_workers(self): [w.join(self.configs.get('supervisor.wait')) for w in self.workers if w.is_alive()] def add_queue(self, name): if 'queues' not in self.params or self.params['queues'] is None: self.params['queues'] = {} self.params['queues'][name] = Queue() def add_event(self, name): if 'events' not in self.params or self.params['events'] is None: self.params['events'] = {} self.params['events'][name] = Event() def add_lock(self, name): if 'locks' not in self.params or self.params['locks'] is None: self.params['locks'] = {} self.params['locks'][name] = Lock() def run(self) -> None: self.start_workers() self.add_worker(Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging']))) while not self.params['events']['stop'].is_set(): if not self.check_workers(): break time.sleep(self.configs.get('supervisor.wait')) self.join_workers() def exit_thread(stop, logger): logger.log('Starting exit thread', caller='exit_thread') keyboard.wait('Esc') logger.log('Escape pressed', caller='exit_thread') stop.set() logger.log('Exit signal sent', caller='exit_thread') def main(args): configs = load_config(args.config_folder) configs.set('timezone', pytz.timezone('America/Santiago')) params = { 'folders': { 'config': args.config_folder, 'log': args.log_folder, 'data': args.data_folder }, 'bosses': Bosses(args.data_folder), 'logging': Logging(configs.get('timezone'), args.log_folder) } setup = { 'workers': [ ('common.helper.logger', 'Worker'), ('src.email', 'Obtenedor'), ('src.email', 'Validador') ], 'queues': ['log', 'emails', 'valid', 'invalid'], 'events': [], 'locks': [] } email = Email(configs, params, setup) email.start() email.join() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-c', '--config_folder', default=os.path.join(os.path.realpath('../..'), 'config')) parser.add_argument('-d', '--data_folder', default=os.path.join(os.path.realpath('../..'), 'data')) parser.add_argument('-l', '--log_folder', default=os.path.join(os.path.realpath('../..'), 'logs')) _args = parser.parse_args() main(_args)