From 2d157f6989a5426901ab649e54699092ff5e0f33 Mon Sep 17 00:00:00 2001 From: Aldarien Date: Mon, 16 Nov 2020 23:33:49 -0300 Subject: [PATCH] Implementacion mas completa de email y api para ingresar datos --- Pipfile | 1 + common/helper/logger.py | 14 +-- config/email.json | 12 +-- config/supervisor.json | 3 +- entry/api.py | 46 ++++++++++ src/action/questions.py | 44 --------- src/bosses.py | 30 ++++++- src/email/__init__.py | 1 - src/email/main.py | 42 ++++++--- src/email/supervisor.py | 10 ++- src/email/workers.py | 195 +++++++++++++++++++++++++++++++++------- src/functions.py | 14 ++- src/instrucciones.py | 61 +++++++++++++ src/worker.py | 8 ++ 14 files changed, 369 insertions(+), 112 deletions(-) create mode 100644 entry/api.py delete mode 100644 src/action/questions.py create mode 100644 src/instrucciones.py diff --git a/Pipfile b/Pipfile index 93d0c82..bcf6f42 100644 --- a/Pipfile +++ b/Pipfile @@ -9,6 +9,7 @@ verify_ssl = true keyboard = "*" beautifulsoup4 = "*" pytz = "*" +flask = "*" [requires] python_version = "3.9" diff --git a/common/helper/logger.py b/common/helper/logger.py index 429a03f..67b7139 100644 --- a/common/helper/logger.py +++ b/common/helper/logger.py @@ -3,6 +3,7 @@ import datetime import locale from threading import Thread import queue +from src.functions import dump_queue def get_today(tz): @@ -83,12 +84,11 @@ class Logger: class Worker(Thread): def __init__(self, params, configs): super().__init__() - self.event = params['events']['stop'] + self.event = params['events']['log_stop'] self.queue = params['queues']['log'] self.wait = configs.get('supervisor.wait') self.logger = Logger(params['folders']['log'], configs.get('timezone'), params['logger']['name']) self.logging = params['logging'] - self.queue.put({'is_start': True}) def parse_message(self, message): if 'is_start' in message and message['is_start']: @@ -110,6 +110,7 @@ class Worker(Thread): def run(self): self.logging.log('Starting', caller=type(self)) + self.queue.put({'is_start': True}) while not self.event.is_set(): try: message = self.queue.get(timeout=self.wait) @@ -117,13 +118,6 @@ class Worker(Thread): self.parse_message(message) except queue.Empty: continue - while True: - try: - message = self.queue.get(timeout=self.wait) - self.logging.log('Logger received message', caller=type(self)) - self.parse_message(message) - except queue.Empty: - break + [self.parse_message(message) for message in dump_queue(self.queue, self.wait)] self.logger.stop_log() self.logging.log('Exiting', caller=type(self)) - return diff --git a/config/email.json b/config/email.json index 807f54f..342f965 100644 --- a/config/email.json +++ b/config/email.json @@ -18,13 +18,7 @@ "ssl": true }, "max": 5, - "server": "imap.yandex.com", - "port": 993, - "user": { - "name": "secretary@incoviba.cl", - "password": "quzshqzyfcnydevp" - }, - "username": "secretary@incoviba.cl", - "password": "quzshqzyfcnydevp", - "ssl": true + "consultas": "email_consultas.json", + "spam": "email_spam.json", + "revisados": "email_revisados.json" } \ No newline at end of file diff --git a/config/supervisor.json b/config/supervisor.json index cef5d1f..64c94fa 100644 --- a/config/supervisor.json +++ b/config/supervisor.json @@ -1,3 +1,4 @@ { - "wait": 15 + "wait": 15, + "timezone": "America/Santiago" } \ No newline at end of file diff --git a/entry/api.py b/entry/api.py new file mode 100644 index 0000000..36c79c7 --- /dev/null +++ b/entry/api.py @@ -0,0 +1,46 @@ +from flask import Flask, redirect, url_for +import os +from src.instrucciones import Instrucciones +import json + + +app = Flask(__name__) +data_folder = os.path.join(os.path.realpath('..'), 'data') + + +@app.route('/', methods=['GET']) +def index(): + return { + 'api': { + 'entrypoints': { + 'bosses': [ + 'add', + '/' + ], + 'instructions': [ + 'add', + '/' + ], + 'email': [] + } + } + } + + +@app.route('/instructions/', methods=['GET']) +def instructions(): + instrucciones = Instrucciones(data_folder) + data = {'Instrucciones': [{'Name': i.instruccion, 'Aliases': i.aliases} for i in instrucciones.instrucciones]} + return json.dumps(data) + + +@app.route('/instructions/add//') +def add_instruccion(instruccion, alias): + ins = Instrucciones(data_folder) + ins.add(instruccion, [alias]) + ins.save() + return redirect(url_for('instructions')) + + +if __name__ == '__main__': + app.run(port=8081, debug=True) diff --git a/src/action/questions.py b/src/action/questions.py deleted file mode 100644 index c03f9e4..0000000 --- a/src/action/questions.py +++ /dev/null @@ -1,44 +0,0 @@ -import os -from threading import Thread -import queue -import json - - -class QuestionWorker(Thread): - def __init__(self, params, configs): - super(QuestionWorker, self).__init__() - self.queue = params['queues']['questions'] - self.logging = params['logging'] - self.logger = params['queues']['log'] - self.event = params['events']['stop'] - self.wait = configs.get('supervisor.wait') - self.folder = params['folders']['data'] - - def question(self, text): - filename = os.path.join(self.folder, 'questions.json') - with open(filename, 'w+', encoding='utf8') as f: - try: - data = json.load(f) - except json.decoder.JSONDecodeError: - data = [] - if text in data: - return - data.append(text) - json.dump(data, f, ensure_ascii=False, indent=4) - self.logging.log(text, type(self)) - - def run(self): - self.logging.log('Starting', type(self)) - self.logger.put({'action': type(self)}) - while not self.event.is_set(): - try: - question = self.queue.get(timeout=self.wait) - if question.type == 'email': - self.question( - '¿Que hago con este correo de {0} y el texto dice {1} en la fecha {2}?'. - format(question.sender, question.text, question.datetime.strftime('%d-%m-%Y %H:%M:%S')) - ) - except queue.Empty: - pass - self.logging.log('Exiting', type(self)) - return diff --git a/src/bosses.py b/src/bosses.py index 78c94ad..f054649 100644 --- a/src/bosses.py +++ b/src/bosses.py @@ -12,8 +12,8 @@ class Boss: class Bosses: def __init__(self, data_folder): - filename = os.path.join(data_folder, 'bosses.json') - with open(filename, 'r') as f: + self.filename = os.path.join(data_folder, 'bosses.json') + with open(self.filename, 'r') as f: data = json.load(f) self.bosses = [] addrs = AddressBook(data_folder) @@ -35,3 +35,29 @@ class Bosses: if a in name: return True return False + + def add_boss(self, name, aliases: list = None): + if self.is_boss(name): + return + b = Boss() + b.full_name = name + if aliases is not None: + b.aliases = aliases + + def get(self, name): + if not self.is_boss(name): + return None + for i, boss in enumerate(self.bosses): + if boss.full_name in name: + return i + for m in boss.contact.emails: + if m in name: + return i + for a in boss.aliases: + if a in name: + return i + + def save(self): + data = [{'full_name': boss.full_name, 'aliases': boss.aliases} for boss in self.bosses] + with open(self.filename, 'w') as f: + json.dump(data, f, indent=4) diff --git a/src/email/__init__.py b/src/email/__init__.py index 78e2f67..e69de29 100644 --- a/src/email/__init__.py +++ b/src/email/__init__.py @@ -1 +0,0 @@ -from .workers import Obtenedor, Validador, Confirmador diff --git a/src/email/main.py b/src/email/main.py index 09cf04d..7f1c185 100644 --- a/src/email/main.py +++ b/src/email/main.py @@ -4,12 +4,12 @@ from common.helper.logging import Logging from setup.config import load_config import pytz from src.bosses import Bosses +from src.instrucciones import Instrucciones from src.email.supervisor import Email -def main(args): - configs = load_config(args.config_folder) - configs.set('timezone', pytz.timezone('America/Santiago')) +def set_params(args, configs): + log_name = 'email' params = { 'folders': { 'config': args.config_folder, @@ -17,25 +17,45 @@ def main(args): 'data': args.data_folder }, 'bosses': Bosses(args.data_folder), - 'logging': Logging(configs.get('timezone'), args.log_folder, 'email'), + 'instrucciones': Instrucciones(args.data_folder), + 'logging': Logging(configs.get('timezone'), args.log_folder, log_name), 'logger': { - 'name': 'email' + 'name': log_name + }, + 'filenames': { + 'consultas': os.path.join(args.data_folder, configs.get('email.consultas')), + 'spam': os.path.join(args.data_folder, configs.get('email.spam')), + 'revisados': os.path.join(args.data_folder, configs.get('email.revisados')) } } + return params - setup = { + +def set_setup(): + return { 'workers': [ ('common.helper.logger', 'Worker'), - ('src.email', 'Obtenedor'), - ('src.email', 'Validador'), - ('src.email', 'Confirmador') + ('src.email.workers', 'Obtenedor'), + ('src.email.workers', 'Validador'), + ('src.email.workers', 'Consultador'), + ('src.email.workers', 'Borrador'), + ('src.email.workers', 'Procesador') ], - 'queues': ['log', 'emails', 'valid', 'invalid'], + 'queues': ['log', 'emails', 'valid', 'invalid', 'borrar'], 'events': [], 'locks': [] } - email = Email(configs, params, setup) + +def main(args): + configs = load_config(args.config_folder) + configs.set('timezone', pytz.timezone(configs.get('supervisor.timezone'))) + + params = set_params(args, configs) + + setup = set_setup() + + email = Email(configs=configs, params=params, setup=setup) email.start() email.join() diff --git a/src/email/supervisor.py b/src/email/supervisor.py index 9386ee1..0d5b4b6 100644 --- a/src/email/supervisor.py +++ b/src/email/supervisor.py @@ -6,6 +6,9 @@ import time class Email(Thread): + """ + Email module supervisor thread + """ def __init__(self, configs, params, setup): super(Email, self).__init__() @@ -17,6 +20,7 @@ class Email(Thread): self.worker_status = [] self.add_event('stop') + self.add_event('log_stop') self.setup(setup) @@ -27,8 +31,8 @@ class Email(Thread): self.add_queue(q) for e in data['events']: self.add_event(e) - for l in data['locks']: - self.add_lock(l) + for lo in data['locks']: + self.add_lock(lo) def register_worker(self, module, name): if module not in self.registry: @@ -99,4 +103,6 @@ class Email(Thread): if not self.check_workers(): break time.sleep(self.configs.get('supervisor.wait')) + self.params['logging'].log('Waiting for workers', type(self)) + self.params['events']['log_stop'].set() self.join_workers() diff --git a/src/email/workers.py b/src/email/workers.py index c3f6a66..39edb6b 100644 --- a/src/email/workers.py +++ b/src/email/workers.py @@ -7,15 +7,18 @@ from bs4 import BeautifulSoup import re import email.utils from src.communication import Message +import json +from src.functions import dump_queue class Obtenedor(Worker): """ - Trabajador que obtiene la lista de correos + Trabajador que obtiene la lista de correos del inbox configurado """ def __init__(self, configs, params): super(Obtenedor, self).__init__(configs, params) + self.name = 'Email:Obtenedor' self.url = configs.get('email.imap.server') self.port = configs.get('email.imap.port') user = {'user': '', 'password': ''} @@ -24,7 +27,9 @@ class Obtenedor(Worker): self.user.password = configs.get('email.imap.user.password') self.ssl = configs.get('email.imap.ssl') + self.filename = params['filenames']['revisados'] self.revisados = [] + self.load_revisados() self.queue = params['queues']['emails'] self.frec = configs.get('supervisor.wait') @@ -32,6 +37,28 @@ class Obtenedor(Worker): def is_revisado(self, uid): return uid in self.revisados + def load_revisados(self): + data = [] + try: + with open(self.filename, 'r') as f: + data = json.load(f) + except FileNotFoundError: + pass + self.revisados = data + + def save_revisados(self): + data = [] + try: + with open(self.filename, 'r') as f: + data = json.load(f) + except FileNotFoundError: + pass + for uid in self.revisados: + if uid not in data: + data.append(uid) + with open(self.filename, 'w') as f: + json.dump(data, f) + def add_revisado(self, uid): if self.is_revisado(uid): return @@ -54,8 +81,7 @@ class Obtenedor(Worker): return output def run(self) -> None: - self.logger.log('Starting', type(self)) - self.diary.put({'start_turn': 'Obtenedor'}) + self.start_turn() while not self.stop.is_set(): e = 0 with connect(self.url, self.port, self.user.name, self.user.password, self.ssl) as imap: @@ -77,75 +103,182 @@ class Obtenedor(Worker): self.logger.log('{0} new emails found'.format(e), type(self)) self.diary.put({'message': 'Obtenidos {0} correos nuevos'.format(e)}) time.sleep(self.frec) - self.diary.put({'end_turn': 'Obtenedor'}) - self.logger.log('Exiting', type(self)) + self.save_revisados() + self.end_turn() class Validador(Worker): + """ + Trabajador que valida segun las reglas establecidas + Reglas: + 1. Listado de jefes, con sus correos en la libreta de contactos -> validos + 2. Instrucciones conocidas -> invalidos, pero para revisar + 3. Listado de spam -> borrar + """ def __init__(self, configs, params): super(Validador, self).__init__(configs=configs, params=params) + self.name = 'Email:Validador' self.emails = params['queues']['emails'] self.validos = params['queues']['valid'] self.invalidos = params['queues']['invalid'] + self.borrar = params['queues']['borrar'] self.bosses = params['bosses'] + self.instrucciones = params['instrucciones'] self.frec = configs.get('supervisor.wait') + def validar_bosses(self, sender): + return self.bosses.is_boss(sender) + + def validar_instrucciones(self, message): + return self.instrucciones.is_valid(message.original.message['subject']) + + def validar(self, message): + if self.validar_bosses(message.sender): + self.validos.put(message) + return + if self.validar_instrucciones(message): + self.invalidos.put(message) + return + self.borrar.put(message) + def run(self): - self.logger.log('Starting', type(self)) - self.diary.put({'start_turn': 'Validador'}) + self.start_turn() while not self.stop.is_set(): try: em = self.emails.get(timeout=self.frec) except queue.Empty: continue - if not self.bosses.is_boss(em.sender): - self.invalidos.put(em) - continue - self.validos.put(em) - self.logger.log('Exiting', type(self)) - self.diary.put({'end_turn': 'Validador'}) + self.validar(em) + # Cleanup + [self.validar(em) for em in dump_queue(self.emails, self.frec)] + self.end_turn() -class Confirmador(Worker): +class Consultador(Worker): + """ + Trabajador que registra los correos que no son de jefes para consulta + """ def __init__(self, configs, params): - super(Confirmador, self).__init__(configs=configs, params=params) + super(Consultador, self).__init__(configs=configs, params=params) + self.name = 'Email:Consultador' + self.filename = params['filenames']['consultas'] self.invalidos = params['queues']['invalid'] self.frec = configs.get('supervisor.wait') self.max = configs.get('email.max') self.mensajes = [] - def mandar(self): - self.logger.log('Sending {0} strange emails'.format(len(self.mensajes)), type(self)) - print(self.mensajes) + def is_full(self): + return len(self.mensajes) >= self.max - def crear_mega_mensaje(self, msg): - if len(self.mensajes) >= self.max: - self.mandar() - self.mensajes = [] + def save_messages(self): + data = [] + try: + with open(self.filename, 'r') as f: + data = json.load(f) + except FileNotFoundError: + pass + for m in self.mensajes: + if m not in data: + data.append(m) + with open(self.filename, 'w') as f: + json.dump(data, f, indent=2) + self.mensajes = [] + + def parse_message(self, message): + msg = { + 'sender': message.sender, + 'uid': message.original.uid, + 'time': message.datetime.strftime('%Y-%m-%d %H:%M:%S'), + 'message': message.text + } self.mensajes.append(msg) def run(self) -> None: - self.logger.log('Starting', type(self)) - self.diary.put({'start_turn': 'Confirmador'}) + self.start_turn() while not self.stop.is_set(): try: - em = self.invalidos.get(self.frec) - self.crear_mega_mensaje(em) + em = self.invalidos.get(timeout=self.frec) except queue.Empty: continue - self.logger.log('Exiting', type(self)) - self.diary.put({'end_turn': 'Confirmador'}) + self.parse_message(em) + if self.is_full(): + self.save_messages() + [self.parse_message(message) for message in dump_queue(self.invalidos, self.frec)] + self.save_messages() + self.end_turn() + + +class Borrador(Worker): + """ + Trabajador que borra los correos marcados para borrar + """ + def __init__(self, configs, params): + super(Borrador, self).__init__(configs=configs, params=params) + + self.name = 'Email:Borrador' + + self.queue = params['queues']['borrar'] + self.frec = configs.get('supervisor.wait') + self.max = configs.get('email.max') + + self.url = configs.get('email.imap.server') + self.port = configs.get('email.imap.port') + user = {'user': '', 'password': ''} + self.user = SimpleNamespace(**user) + self.user.name = configs.get('email.imap.user.name') + self.user.password = configs.get('email.imap.user.password') + self.ssl = configs.get('email.imap.ssl') + + self.borrar = [] + + def is_full(self): + return len(self.borrar) >= self.max + + def do_borrar(self): + with connect(self.url, self.port, self.user.name, self.user.password, self.ssl) as imap: + status, msg = imap.select('INBOX') + if status != 'OK': + return + + for uid in self.borrar: + status, ids = imap.uid('store', uid, '+FLAGS', b'\\Deleted') + if status != 'OK': + continue + + imap.expunge() + self.borrar = [] + + def run(self) -> None: + self.start_turn() + while not self.stop.is_set(): + try: + uid = self.queue.get(timeout=self.frec) + except queue.Empty: + continue + self.borrar.append(uid) + if self.is_full(): + self.do_borrar() + # Cleanup + [self.borrar.append(uid) for uid in dump_queue(self.queue, self.frec)] + self.do_borrar() + self.end_turn() class Procesador(Worker): + """ + Trabajador que revisa los correos validos y los procesa de acuerdo a las instrucciones + """ def __init__(self, configs, params): super(Procesador, self).__init__(configs=configs, params=params) + self.name = 'Email:Procesador' + def run(self) -> None: - self.diary.put({'start_turn': 'Procesador'}) - self.diary.put({'end_turn': 'Procesador'}) - pass + self.start_turn() + while not self.stop.is_set(): + continue + self.end_turn() diff --git a/src/functions.py b/src/functions.py index a58e53a..084f775 100644 --- a/src/functions.py +++ b/src/functions.py @@ -1,4 +1,5 @@ import keyboard +import queue as q def exit_thread(stop, logger): @@ -6,4 +7,15 @@ def exit_thread(stop, logger): keyboard.wait('Esc') logger.log('Escape pressed', caller='exit_thread') stop.set() - logger.log('Exit signal sent', caller='exit_thread') \ No newline at end of file + logger.log('Exit signal sent', caller='exit_thread') + + +def dump_queue(queue, wait): + data = [] + while True: + try: + m = queue.get(timeout=wait) + except q.Empty: + break + data.append(m) + return data diff --git a/src/instrucciones.py b/src/instrucciones.py new file mode 100644 index 0000000..f9e5d80 --- /dev/null +++ b/src/instrucciones.py @@ -0,0 +1,61 @@ +import os +import json + + +class Instruccion: + def __init__(self): + self.instruccion = '' + self.aliases = [] + + +class Instrucciones: + def __init__(self, data_folder): + self.filename = os.path.join(data_folder, 'instrucciones.json') + data = [] + try: + with open(self.filename, 'r') as f: + data = json.load(f) + except FileNotFoundError: + pass + + self.instrucciones = [] + for d in data: + i = Instruccion() + i.instruccion = d['instruccion'] + for a in d['aliases']: + i.aliases.append(a) + self.instrucciones.append(i) + + def get(self, instruccion): + if not self.is_valid(instruccion): + return None + for i, ins in enumerate(self.instrucciones): + if instruccion == ins.instruccion: + return i + if instruccion in ins.aliases: + return i + + def is_valid(self, instruccion): + for i in self.instrucciones: + if instruccion == i.instruccion: + return True + if instruccion in i.aliases: + return True + return False + + def add(self, instruccion, aliases: list = None): + if self.is_valid(instruccion): + if aliases is not None: + i = self.get(instruccion) + self.instrucciones[i].aliases = aliases + return + ins = Instruccion() + ins.instruccion = instruccion + if aliases is not None: + ins.aliases = aliases + self.instrucciones.append(ins) + + def save(self): + data = [{'instruccion': i.instruccion, 'aliases': i.aliases} for i in self.instrucciones] + with open(self.filename, 'w') as f: + json.dump(data, f, indent=4) diff --git a/src/worker.py b/src/worker.py index 5110c75..eb230d7 100644 --- a/src/worker.py +++ b/src/worker.py @@ -7,3 +7,11 @@ class Worker(Thread): self.stop = params['events']['stop'] self.diary = params['queues']['log'] self.logger = params['logging'] + + def start_turn(self): + self.diary.put({'start_turn': self.name}) + self.logger.log('Starting', type(self)) + + def end_turn(self): + self.diary.put({'end_turn': self.name}) + self.logger.log('Exiting', type(self))