Completed
Pull Request — master (#15)
by
unknown
02:01
created

LoaferManager   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 60
Duplicated Lines 0 %

Importance

Changes 9
Bugs 0 Features 1
Metric Value
c 9
b 0
f 1
dl 0
loc 60
rs 10
wmc 7

5 Methods

Rating   Name   Duplication   Size   Complexity  
A start() 0 13 1
A __init__() 0 21 2
A on_future__errors() 0 6 2
A stop() 0 12 1
A get_dispatcher() 0 2 1
1
# -*- coding: utf-8 -*-
2
# vi:si:et:sw=4:sts=4:ts=4
3
4
import asyncio
5
from concurrent.futures import ThreadPoolExecutor
6
import logging
7
import os
8
import signal
9
10
from . import __version__
11
from .dispatcher import LoaferDispatcher
0 ignored issues
show
Configuration introduced by
Unable to import 'dispatcher' (invalid syntax (<string>, line 35))

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
12
from .aws.consumer import Consumer as AWSConsumer
0 ignored issues
show
Bug introduced by
The name consumer does not seem to exist in module loafer.aws.
Loading history...
Configuration introduced by
Unable to import 'aws.consumer' (invalid syntax (<string>, line 29))

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
13
14
15
logger = logging.getLogger(__name__)
16
17
18
class LoaferManager(object):
19
20
    def __init__(self, source, thread_pool_size=None, event_loop=None, consumers=None):
21
22
        self._loop = event_loop or asyncio.get_event_loop()
23
        self._loop.add_signal_handler(signal.SIGINT, self.stop)
24
        self._loop.add_signal_handler(signal.SIGTERM, self.stop)
25
26
        # XXX: See https://github.com/python/asyncio/issues/258
27
        # The minimum value depends on the number of cores in the machine
28
        # See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
29
        self.thread_pool_size = thread_pool_size
30
        self._executor = ThreadPoolExecutor(self.thread_pool_size)
31
        self._loop.set_default_executor(self._executor)
32
33
        self.routes = []
34
35
        if consumers is None:
36
            self.consumers = [AWSConsumer(source, {'WaitTimeSeconds': 5, 'MaxNumberOfMessages': 5}, loop=self._loop)]
37
        else:
38
            self.consumers = consumers
39
40
        self._dispatcher = None
41
42
    def get_dispatcher(self):
43
        return LoaferDispatcher(self.routes, self.consumers)
44
45
    def start(self):
46
        start_message = 'Starting Loafer - Version: {} (pid={}) ...'
47
        logger.info(start_message.format(__version__, os.getpid()))
48
49
        self._dispatcher = self.get_dispatcher()
50
51
        self._future = asyncio.gather(self._dispatcher.dispatch_consumers())
0 ignored issues
show
Coding Style introduced by
The attribute _future was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
52
        self._future.add_done_callback(self.on_future__errors)
53
54
        try:
55
            self._loop.run_forever()
56
        finally:
57
            self._loop.close()
58
59
    def stop(self, *args, **kwargs):
0 ignored issues
show
Unused Code introduced by
The argument kwargs seems to be unused.
Loading history...
Unused Code introduced by
The argument args seems to be unused.
Loading history...
60
        logger.info('Stopping Loafer ...')
61
62
        logger.debug('Stopping consumers ...')
63
        self._dispatcher.stop_consumers()
64
65
        logger.debug('Cancel schedulled operations ...')
66
        self._future.cancel()
67
68
        logger.debug('Waiting to shutdown ...')
69
        self._executor.shutdown(wait=True)
70
        self._loop.stop()
71
72
    def on_future__errors(self, future):
73
        exc = future.exception()
74
        # Unhandled errors crashes the event loop execution
75
        if isinstance(exc, BaseException):
76
            logger.critical('Fatal error caught: {!r}'.format(exc))
77
            self.stop()
78