1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
# vi:si:et:sw=4:sts=4:ts=4 |
3
|
|
|
|
4
|
|
|
import asyncio |
5
|
|
|
from concurrent.futures import ThreadPoolExecutor |
6
|
|
|
import logging |
7
|
|
|
import os |
8
|
|
|
import signal |
9
|
|
|
|
10
|
|
|
from . import __version__ |
11
|
|
|
from .dispatcher import LoaferDispatcher |
|
|
|
|
12
|
|
|
from .aws.consumer import Consumer as AWSConsumer |
|
|
|
|
13
|
|
|
|
14
|
|
|
|
15
|
|
|
logger = logging.getLogger(__name__) |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
class LoaferManager(object): |
19
|
|
|
|
20
|
|
|
def __init__(self, source, thread_pool_size=None, event_loop=None, consumers=None): |
21
|
|
|
|
22
|
|
|
self._loop = event_loop or asyncio.get_event_loop() |
23
|
|
|
self._loop.add_signal_handler(signal.SIGINT, self.stop) |
24
|
|
|
self._loop.add_signal_handler(signal.SIGTERM, self.stop) |
25
|
|
|
|
26
|
|
|
# XXX: See https://github.com/python/asyncio/issues/258 |
27
|
|
|
# The minimum value depends on the number of cores in the machine |
28
|
|
|
# See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor |
29
|
|
|
self.thread_pool_size = thread_pool_size |
30
|
|
|
self._executor = ThreadPoolExecutor(self.thread_pool_size) |
31
|
|
|
self._loop.set_default_executor(self._executor) |
32
|
|
|
|
33
|
|
|
self.routes = [] |
34
|
|
|
|
35
|
|
|
if consumers is None: |
36
|
|
|
self.consumers = [AWSConsumer(source, {'WaitTimeSeconds': 5, 'MaxNumberOfMessages': 5}, loop=self._loop)] |
37
|
|
|
else: |
38
|
|
|
self.consumers = consumers |
39
|
|
|
|
40
|
|
|
self._dispatcher = None |
41
|
|
|
|
42
|
|
|
def get_dispatcher(self): |
43
|
|
|
return LoaferDispatcher(self.routes, self.consumers) |
44
|
|
|
|
45
|
|
|
def start(self): |
46
|
|
|
start_message = 'Starting Loafer - Version: {} (pid={}) ...' |
47
|
|
|
logger.info(start_message.format(__version__, os.getpid())) |
48
|
|
|
|
49
|
|
|
self._dispatcher = self.get_dispatcher() |
50
|
|
|
|
51
|
|
|
self._future = asyncio.gather(self._dispatcher.dispatch_consumers()) |
|
|
|
|
52
|
|
|
self._future.add_done_callback(self.on_future__errors) |
53
|
|
|
|
54
|
|
|
try: |
55
|
|
|
self._loop.run_forever() |
56
|
|
|
finally: |
57
|
|
|
self._loop.close() |
58
|
|
|
|
59
|
|
|
def stop(self, *args, **kwargs): |
|
|
|
|
60
|
|
|
logger.info('Stopping Loafer ...') |
61
|
|
|
|
62
|
|
|
logger.debug('Stopping consumers ...') |
63
|
|
|
self._dispatcher.stop_consumers() |
64
|
|
|
|
65
|
|
|
logger.debug('Cancel schedulled operations ...') |
66
|
|
|
self._future.cancel() |
67
|
|
|
|
68
|
|
|
logger.debug('Waiting to shutdown ...') |
69
|
|
|
self._executor.shutdown(wait=True) |
70
|
|
|
self._loop.stop() |
71
|
|
|
|
72
|
|
|
def on_future__errors(self, future): |
73
|
|
|
exc = future.exception() |
74
|
|
|
# Unhandled errors crashes the event loop execution |
75
|
|
|
if isinstance(exc, BaseException): |
76
|
|
|
logger.critical('Fatal error caught: {!r}'.format(exc)) |
77
|
|
|
self.stop() |
78
|
|
|
|
This can be caused by one of the following:
1. Missing Dependencies
This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.
2. Missing __init__.py files
This error could also result from missing
__init__.py
files in your module folders. Make sure that you place one file in each sub-folder.