|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
# vi:si:et:sw=4:sts=4:ts=4 |
|
3
|
|
|
|
|
4
|
|
|
import asyncio |
|
5
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
6
|
|
|
import logging |
|
7
|
|
|
import os |
|
8
|
|
|
import signal |
|
9
|
|
|
|
|
10
|
|
|
from . import __version__ |
|
11
|
|
|
from .dispatcher import LoaferDispatcher |
|
|
|
|
|
|
12
|
|
|
from .aws.consumer import Consumer as AWSConsumer |
|
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
|
|
15
|
|
|
logger = logging.getLogger(__name__) |
|
16
|
|
|
|
|
17
|
|
|
|
|
18
|
|
|
class LoaferManager(object): |
|
19
|
|
|
|
|
20
|
|
|
def __init__(self, source, thread_pool_size=4, event_loop=None): |
|
21
|
|
|
|
|
22
|
|
|
if event_loop is None: |
|
23
|
|
|
self._loop = asyncio.get_event_loop() |
|
24
|
|
|
else: |
|
25
|
|
|
self._loop = event_loop |
|
26
|
|
|
self._loop.add_signal_handler(signal.SIGINT, self.stop) |
|
27
|
|
|
self._loop.add_signal_handler(signal.SIGTERM, self.stop) |
|
28
|
|
|
|
|
29
|
|
|
# XXX: See https://github.com/python/asyncio/issues/258 |
|
30
|
|
|
# The minimum value depends on the number of cores in the machine |
|
31
|
|
|
# See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor |
|
32
|
|
|
self.thread_pool_size = thread_pool_size |
|
33
|
|
|
self._executor = ThreadPoolExecutor(self.thread_pool_size) |
|
34
|
|
|
self._loop.set_default_executor(self._executor) |
|
35
|
|
|
|
|
36
|
|
|
self.routes = [] |
|
37
|
|
|
self.consumers = [AWSConsumer(source, {'WaitTimeSeconds': 5, 'MaxNumberOfMessages': 5}, loop=self._loop)] |
|
38
|
|
|
self._dispatcher = None |
|
39
|
|
|
|
|
40
|
|
|
def get_dispatcher(self): |
|
41
|
|
|
return LoaferDispatcher(self.routes, self.consumers) |
|
42
|
|
|
|
|
43
|
|
|
def start(self): |
|
44
|
|
|
start_message = 'Starting Loafer - Version: {} (pid={}) ...' |
|
45
|
|
|
logger.info(start_message.format(__version__, os.getpid())) |
|
46
|
|
|
|
|
47
|
|
|
self._dispatcher = self.get_dispatcher() |
|
48
|
|
|
|
|
49
|
|
|
self._future = asyncio.gather(self._dispatcher.dispatch_consumers()) |
|
|
|
|
|
|
50
|
|
|
self._future.add_done_callback(self.on_future__errors) |
|
51
|
|
|
|
|
52
|
|
|
try: |
|
53
|
|
|
self._loop.run_forever() |
|
54
|
|
|
finally: |
|
55
|
|
|
self._loop.close() |
|
56
|
|
|
|
|
57
|
|
|
def stop(self, *args, **kwargs): |
|
|
|
|
|
|
58
|
|
|
logger.info('Stopping Loafer ...') |
|
59
|
|
|
|
|
60
|
|
|
logger.debug('Stopping consumers ...') |
|
61
|
|
|
self._dispatcher.stop_consumers() |
|
62
|
|
|
|
|
63
|
|
|
logger.debug('Cancel schedulled operations ...') |
|
64
|
|
|
self._future.cancel() |
|
65
|
|
|
|
|
66
|
|
|
logger.debug('Waiting to shutdown ...') |
|
67
|
|
|
self._executor.shutdown(wait=True) |
|
68
|
|
|
self._loop.stop() |
|
69
|
|
|
|
|
70
|
|
|
def on_future__errors(self, future): |
|
71
|
|
|
exc = future.exception() |
|
72
|
|
|
# Unhandled errors crashes the event loop execution |
|
73
|
|
|
if isinstance(exc, BaseException): |
|
74
|
|
|
logger.critical('Fatal error caught: {!r}'.format(exc)) |
|
75
|
|
|
self.stop() |
|
76
|
|
|
|
This can be caused by one of the following:
1. Missing Dependencies
This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.
2. Missing __init__.py files
This error could also result from missing
__init__.pyfiles in your module folders. Make sure that you place one file in each sub-folder.