1
|
|
|
# vi:si:et:sw=4:sts=4:ts=4 |
2
|
|
|
|
3
|
|
|
import asyncio |
4
|
|
|
from concurrent.futures import ThreadPoolExecutor |
5
|
|
|
import logging |
6
|
|
|
import os |
7
|
|
|
import signal |
8
|
|
|
|
9
|
|
|
from . import __version__ |
10
|
|
|
from .dispatcher import LoaferDispatcher |
|
|
|
|
11
|
|
|
from .aws.consumer import Consumer as AWSConsumer |
|
|
|
|
12
|
|
|
|
13
|
|
|
|
14
|
|
|
logger = logging.getLogger(__name__) |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
class LoaferManager(object): |
18
|
|
|
|
19
|
|
|
def __init__(self, source, thread_pool_size=4): |
20
|
|
|
self._loop = asyncio.get_event_loop() |
21
|
|
|
self._loop.add_signal_handler(signal.SIGINT, self.stop) |
22
|
|
|
self._loop.add_signal_handler(signal.SIGTERM, self.stop) |
23
|
|
|
|
24
|
|
|
# XXX: See https://github.com/python/asyncio/issues/258 |
25
|
|
|
# The minimum value depends on the number of cores in the machine |
26
|
|
|
# See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor |
27
|
|
|
self.thread_pool_size = thread_pool_size |
28
|
|
|
self._executor = ThreadPoolExecutor(self.thread_pool_size) |
29
|
|
|
self._loop.set_default_executor(self._executor) |
30
|
|
|
|
31
|
|
|
self.routes = [] |
32
|
|
|
self.consumers = [AWSConsumer(source, {'WaitTimeSeconds': 5, 'MaxNumberOfMessages': 5}, loop=self._loop)] |
33
|
|
|
self._dispatcher = None |
34
|
|
|
|
35
|
|
|
def get_dispatcher(self): |
36
|
|
|
return LoaferDispatcher(self.routes, self.consumers) |
37
|
|
|
|
38
|
|
|
def start(self): |
39
|
|
|
start_message = 'Starting Loafer - Version: {} (pid={}) ...' |
40
|
|
|
logger.info(start_message.format(__version__, os.getpid())) |
41
|
|
|
|
42
|
|
|
self._dispatcher = self.get_dispatcher() |
43
|
|
|
|
44
|
|
|
self._future = asyncio.gather(self._dispatcher.dispatch_consumers()) |
|
|
|
|
45
|
|
|
self._future.add_done_callback(self.on_future__errors) |
46
|
|
|
|
47
|
|
|
try: |
48
|
|
|
self._loop.run_forever() |
49
|
|
|
finally: |
50
|
|
|
self._loop.close() |
51
|
|
|
|
52
|
|
|
def stop(self, *args, **kwargs): |
|
|
|
|
53
|
|
|
logger.info('Stopping Loafer ...') |
54
|
|
|
|
55
|
|
|
logger.debug('Stopping consumers ...') |
56
|
|
|
self._dispatcher.stop_consumers() |
57
|
|
|
|
58
|
|
|
logger.debug('Cancel schedulled operations ...') |
59
|
|
|
self._future.cancel() |
60
|
|
|
|
61
|
|
|
logger.debug('Waiting to shutdown ...') |
62
|
|
|
self._executor.shutdown(wait=True) |
63
|
|
|
self._loop.stop() |
64
|
|
|
|
65
|
|
|
def on_future__errors(self, future): |
66
|
|
|
exc = future.exception() |
67
|
|
|
# Unhandled errors crashes the event loop execution |
68
|
|
|
if isinstance(exc, BaseException): |
69
|
|
|
logger.critical('Fatal error caught: {!r}'.format(exc)) |
70
|
|
|
self.stop() |
71
|
|
|
|
This can be caused by one of the following:
1. Missing Dependencies
This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.
2. Missing __init__.py files
This error could also result from missing
__init__.py
files in your module folders. Make sure that you place one file in each sub-folder.