Supervisor de trabajadores
This commit is contained in:
93
src/supervisor.py
Normal file
93
src/supervisor.py
Normal file
@ -0,0 +1,93 @@
|
||||
from threading import Thread, Event, Lock
|
||||
from queue import Queue
|
||||
import keyboard
|
||||
import time
|
||||
import importlib
|
||||
|
||||
|
||||
class Supervisor(Thread):
|
||||
def __init__(self, configs, params):
|
||||
super().__init__()
|
||||
self._configs = configs
|
||||
self.params = params
|
||||
self._workers = []
|
||||
self._status = []
|
||||
self._registry = {}
|
||||
self.add_event('stop')
|
||||
|
||||
def add_worker(self, worker):
|
||||
self._workers.append(worker)
|
||||
|
||||
def start_workers(self):
|
||||
[w.start() for w in self._workers]
|
||||
[self._status.append(True) for w in self._workers]
|
||||
|
||||
def check_workers(self):
|
||||
stopped = 0
|
||||
for (k, w) in enumerate(self._workers):
|
||||
if not self._status[k]:
|
||||
stopped += 1
|
||||
continue
|
||||
if not w.is_alive():
|
||||
self.params['logging'].log('Worker {0} stopped'.format(type(w)))
|
||||
self.params['queues']['log'].put({'not': True, 'action': type(w)})
|
||||
stopped += 1
|
||||
self._status[k] = False
|
||||
if stopped == len(self._workers):
|
||||
return False
|
||||
return True
|
||||
|
||||
def join_workers(self):
|
||||
[w.join(self._configs.get('supervisor.wait')) for w in self._workers]
|
||||
|
||||
def add_queue(self, name):
|
||||
if 'queues' not in self.params or self.params['queues'] is None:
|
||||
self.params['queues'] = {}
|
||||
self.params['queues'][name] = Queue()
|
||||
|
||||
def add_event(self, name):
|
||||
if 'events' not in self.params or self.params['events'] is None:
|
||||
self.params['events'] = {}
|
||||
self.params['events'][name] = Event()
|
||||
|
||||
def add_lock(self, name):
|
||||
if 'locks' not in self.params or self.params['locks'] is None:
|
||||
self.params['locks'] = {}
|
||||
self.params['locks'][name] = Lock()
|
||||
|
||||
def register_worker(self, module, name):
|
||||
if module not in self._registry:
|
||||
self._registry[module] = []
|
||||
self._registry[module].append(name)
|
||||
|
||||
def run(self):
|
||||
for (module_name, ws) in self._registry.items():
|
||||
module = importlib.import_module(module_name)
|
||||
for w in ws:
|
||||
worker = getattr(module, w)
|
||||
self.add_worker(worker(configs=self._configs, params=self.params))
|
||||
|
||||
self.add_worker(Thread(target=exit_thread, args=(self.params['events']['stop'], self.params['logging'])))
|
||||
|
||||
self.params['queues']['log'].put({'is_start': True})
|
||||
self.start_workers()
|
||||
while not self.params['events']['stop'].is_set():
|
||||
self.params['logging'].log('Looping main status {0}'.format(not self.params['events']['stop'].is_set()),
|
||||
caller=type(self))
|
||||
if not self.check_workers():
|
||||
break
|
||||
self.params['logging'].log('Waiting {0} secs'.format(self._configs.get('supervisor.wait')),
|
||||
caller=type(self))
|
||||
time.sleep(self._configs.get('supervisor.wait'))
|
||||
self.params['logging'].log('Exiting', caller=type(self))
|
||||
self.join_workers()
|
||||
self.params['logging'].log('Waiting for Workers', caller=type(self))
|
||||
return
|
||||
|
||||
|
||||
def exit_thread(event, logging):
|
||||
logging.log('Starting exit thread', caller='exit_thread')
|
||||
keyboard.wait('Esc')
|
||||
logging.log('Escape pressed', caller='exit_thread')
|
||||
event.set()
|
||||
logging.log('Exit signal sent', caller='exit_thread')
|
Reference in New Issue
Block a user