From 9adafbae13f35645ebd5cb188c03ff7f4e798286 Mon Sep 17 00:00:00 2001 From: Aldarien Date: Thu, 12 Nov 2020 00:35:04 -0300 Subject: [PATCH] Src --- src/{text => communication}/message.py | 0 src/email/__init__.py | 2 + src/email/main.py | 148 +++++++++++++++++++++++++ src/email/obtenedor.py | 78 +++++++++++++ src/email/revisor_worker.py | 2 +- src/email/validador.py | 25 +++++ src/worker.py | 9 ++ 7 files changed, 263 insertions(+), 1 deletion(-) rename src/{text => communication}/message.py (100%) create mode 100644 src/email/main.py create mode 100644 src/email/obtenedor.py create mode 100644 src/email/validador.py create mode 100644 src/worker.py diff --git a/src/text/message.py b/src/communication/message.py similarity index 100% rename from src/text/message.py rename to src/communication/message.py diff --git a/src/email/__init__.py b/src/email/__init__.py index 31bb193..000c687 100644 --- a/src/email/__init__.py +++ b/src/email/__init__.py @@ -1,2 +1,4 @@ from .email_interpreter import EmailInterpreter from .revisor_worker import RevisorEmailWorker +from .obtenedor import Obtenedor +from .validador import Validador diff --git a/src/email/main.py b/src/email/main.py new file mode 100644 index 0000000..9fccf26 --- /dev/null +++ b/src/email/main.py @@ -0,0 +1,148 @@ +from multiprocessing import Process +from queue import Queue +from threading import Thread, Event, Lock +import argparse +import os +from common.helper.logging import Logging +from setup.config import load_config +import pytz +from src.bosses import Bosses +import keyboard +import time + + +class Email(Process): + def __init__(self, configs, params, setup): + super(Email, self).__init__() + + self.configs = configs + self.params = params + self.registry = {} + self.workers = [] + self.working = [] + self.worker_status = [] + + self.add_event('stop') + + def setup(self, data): + for (m, n) in data['workers']: + self.register_worker(m, n) + for q in data['queues']: + self.add_queue(q) + for e in data['events']: + self.add_event(e) + for l in data['locks']: + self.add_lock(l) + + def register_worker(self, module, name): + if module not in self.registry: + self.registry[module] = [] + self.registry[module].append(name) + + def add_worker(self, worker): + self.workers.append(worker) + + def start_worker(self, module, name): + worker = getattr(module, name) + self.add_worker(worker) + self.working.append((module, name)) + worker.start() + self.worker_status.append(True) + + def start_workers(self): + for module, workers in self.registry: + for name in workers: + self.start_worker(module, name) + + def check_workers(self): + stopped = 0 + for (k, w) in enumerate(self.workers): + if not self.worker_status[k]: + stopped += 1 + continue + if not w.is_alive(): + self.params['logging'].log('Worker {0} stopped'.format(type(w))) + self.params['queues']['log'].put({'not': True, 'action': type(w)}) + stopped += 1 + self.worker_status[k] = False + (m, n) = self.working[k] + # Restart worker + self.start_worker(m, n) + if stopped == len(self.workers): + return False + return True + + def join_workers(self): + [w.join(self.configs.get('supervisor.wait')) for w in self.workers if w.is_alive()] + + def add_queue(self, name): + if 'queues' not in self.params or self.params['queues'] is None: + self.params['queues'] = {} + self.params['queues'][name] = Queue() + + def add_event(self, name): + if 'events' not in self.params or self.params['events'] is None: + self.params['events'] = {} + self.params['events'][name] = Event() + + def add_lock(self, name): + if 'locks' not in self.params or self.params['locks'] is None: + self.params['locks'] = {} + self.params['locks'][name] = Lock() + + def run(self) -> None: + self.start_workers() + self.add_worker(Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging']))) + + while not self.params['events']['stop'].is_set(): + if not self.check_workers(): + break + time.sleep(self.configs.get('supervisor.wait')) + self.join_workers() + + +def exit_thread(stop, logger): + logger.log('Starting exit thread', caller='exit_thread') + keyboard.wait('Esc') + logger.log('Escape pressed', caller='exit_thread') + stop.set() + logger.log('Exit signal sent', caller='exit_thread') + + +def main(args): + configs = load_config(args.config_folder) + configs.set('timezone', pytz.timezone('America/Santiago')) + params = { + 'folders': { + 'config': args.config_folder, + 'log': args.log_folder, + 'data': args.data_folder + }, + 'bosses': Bosses(args.data_folder), + 'logging': Logging(configs.get('timezone'), args.log_folder) + } + + setup = { + 'workers': [ + ('common.helper.logger', 'Worker'), + ('src.email', 'Obtenedor'), + ('src.email', 'Validador') + ], + 'queues': ['log', 'emails', 'valid', 'invalid'], + 'events': [], + 'locks': [] + } + + email = Email(configs, params, setup) + + email.start() + email.join() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('-c', '--config_folder', default=os.path.join(os.path.realpath('../..'), 'config')) + parser.add_argument('-d', '--data_folder', default=os.path.join(os.path.realpath('../..'), 'data')) + parser.add_argument('-l', '--log_folder', default=os.path.join(os.path.realpath('../..'), 'logs')) + _args = parser.parse_args() + main(_args) diff --git a/src/email/obtenedor.py b/src/email/obtenedor.py new file mode 100644 index 0000000..407bba6 --- /dev/null +++ b/src/email/obtenedor.py @@ -0,0 +1,78 @@ +from src.worker import Worker +from types import SimpleNamespace +from entry.email.inbox import connect, check_inbox +import re +from bs4 import BeautifulSoup +import email.utils +from src.communication.message import Message +import time + + +class Obtenedor(Worker): + """ + Trabajador que obtiene la lista de correos + """ + def __init__(self, configs, params): + super(Obtenedor, self).__init__(configs, params) + + self.url = configs.get('email.server') + self.port = configs.get('email.port') + user = {'user': '', 'password': ''} + self.user = SimpleNamespace(**user) + self.user.name = configs.get('email.user.name') + self.user.password = configs.get('email.user.password') + self.ssl = configs.get('email.ssl') + + self.revisados = [] + + self.queue = params['queues']['emails'] + self.frec = configs.get('supervisor.wait') + + def is_revisado(self, uid): + return uid in self.revisados + + def add_revisado(self, uid): + if self.is_revisado(uid): + return + self.revisados.append(uid) + + def build_message(self, email_part): + output = [] + if email_part.is_multipart(): + for part in email_part.get_payload(): + output.append(self.build_message(part)) + else: + html = email_part.get_payload(decode=True) + bs = BeautifulSoup(html, 'html.parser') + if bs.body: + html = bs.body.get_text() + else: + html = bs.get_text() + html = re.sub(' +', ' ', re.sub("\n+", ' ', html)).strip(' ') + output.append(html) + return output + + def run(self) -> None: + self.logger.log('Starting', type(self)) + self.diary.put({'action': 'Inicio de jornada de Obtenedor'}) + while not self.stop.is_set(): + e = 0 + with connect(self.url, self.port, self.user.name, self.user.password, self.ssl) as imap: + self.logger.log('Getting emails', type(self)) + emails = check_inbox(imap) + if emails is None: + continue + for em in emails: + if self.is_revisado(em.uid): + continue + sender = em.message['from'] + text = ' '.join([em.message['subject'] + '.'] + self.build_message(em.message)) + msg = Message('email', text=text, original=em, sender=sender, + datetime=email.utils.parsedate_to_datetime(em.message['Date'])) + self.queue.put(msg) + self.add_revisado(em.uid) + e += 1 + self.diary.put({'action': 'Obtenidos {0} correos nuevos'.format(e)}) + time.sleep(self.frec) + self.logger.log('Exiting', type(self)) + self.diary.put({'action': 'Terminando el turno de Obtenedor'}) diff --git a/src/email/revisor_worker.py b/src/email/revisor_worker.py index f6e3088..4310507 100644 --- a/src/email/revisor_worker.py +++ b/src/email/revisor_worker.py @@ -4,7 +4,7 @@ import time import email.utils from bs4 import BeautifulSoup from entry.email.inbox import connect, check_inbox -from src.text.message import Message +from src.communication.message import Message class RevisorEmailWorker(Thread): diff --git a/src/email/validador.py b/src/email/validador.py new file mode 100644 index 0000000..e0cdfb2 --- /dev/null +++ b/src/email/validador.py @@ -0,0 +1,25 @@ +from src.worker import Worker + + +class Validador(Worker): + def __init__(self, configs, params): + super(Validador, self).__init__(configs, params) + + self.emails = params['queues']['emails'] + self.validos = params['queues']['valid'] + self.invalidos = params['queues']['invalid'] + self.bosses = params['bosses'] + + self.frec = configs.get('supervisor.wait') + + def run(self): + self.logger.log('Starting', type(self)) + self.diary.put({'action': 'Inicio de jornada de Validador'}) + while not self.stop.is_set(): + em = self.emails.get(timeout=self.frec) + if not self.bosses.is_boss(em.sender): + self.invalidos.put(em) + continue + self.validos.put(em) + self.logger.log('Exiting', type(self)) + self.diary.put({'action': 'Terminando la jornada de Validador'}) diff --git a/src/worker.py b/src/worker.py new file mode 100644 index 0000000..5110c75 --- /dev/null +++ b/src/worker.py @@ -0,0 +1,9 @@ +from threading import Thread + + +class Worker(Thread): + def __init__(self, configs, params): + super(Worker, self).__init__() + self.stop = params['events']['stop'] + self.diary = params['queues']['log'] + self.logger = params['logging']