Completed
Pull Request — master (#15)
by
unknown
02:46
created

LoaferManager.routes()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 4
dl 0
loc 15
rs 9.2

1 Method

Rating   Name   Duplication   Size   Complexity  
A LoaferManager.get_dispatcher() 0 2 1
1
# -*- coding: utf-8 -*-
2
# vi:si:et:sw=4:sts=4:ts=4
3
4
import asyncio
5
from concurrent.futures import ThreadPoolExecutor
6
import logging
7
import os
8
import signal
9
10
from . import __version__
11
from .dispatcher import LoaferDispatcher
0 ignored issues
show
Configuration introduced by
Unable to import 'dispatcher' (invalid syntax (<string>, line 36))

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
12
from .aws.consumer import Consumer as AWSConsumer
0 ignored issues
show
Bug introduced by
The name consumer does not seem to exist in module loafer.aws.
Loading history...
Configuration introduced by
Unable to import 'aws.consumer' (invalid syntax (<string>, line 29))

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
13
14
15
logger = logging.getLogger(__name__)
16
17
18
class LoaferManager(object):
19
20
    def __init__(self, source, thread_pool_size=4, event_loop=None, consumers=None):
21
22
        if event_loop is None:
23
            self._loop = asyncio.get_event_loop()
24
        else:
25
            self._loop = event_loop
26
        self._loop.add_signal_handler(signal.SIGINT, self.stop)
27
        self._loop.add_signal_handler(signal.SIGTERM, self.stop)
28
29
        # XXX: See https://github.com/python/asyncio/issues/258
30
        # The minimum value depends on the number of cores in the machine
31
        # See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
32
        self.thread_pool_size = thread_pool_size
33
        self._executor = ThreadPoolExecutor(self.thread_pool_size)
34
        self._loop.set_default_executor(self._executor)
35
36
        self.routes = []
37
38
        if consumers is None:
39
            self.consumers = [AWSConsumer(source, {'WaitTimeSeconds': 5, 'MaxNumberOfMessages': 5}, loop=self._loop)]
40
        else:
41
            self.consumers = consumers
42
43
        self._dispatcher = None
44
45
    def get_dispatcher(self):
46
        return LoaferDispatcher(self.routes, self.consumers)
47
48
    def start(self):
49
        start_message = 'Starting Loafer - Version: {} (pid={}) ...'
50
        logger.info(start_message.format(__version__, os.getpid()))
51
52
        self._dispatcher = self.get_dispatcher()
53
54
        self._future = asyncio.gather(self._dispatcher.dispatch_consumers())
0 ignored issues
show
Coding Style introduced by
The attribute _future was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
55
        self._future.add_done_callback(self.on_future__errors)
56
57
        try:
58
            self._loop.run_forever()
59
        finally:
60
            self._loop.close()
61
62
    def stop(self, *args, **kwargs):
0 ignored issues
show
Unused Code introduced by
The argument args seems to be unused.
Loading history...
Unused Code introduced by
The argument kwargs seems to be unused.
Loading history...
63
        logger.info('Stopping Loafer ...')
64
65
        logger.debug('Stopping consumers ...')
66
        self._dispatcher.stop_consumers()
67
68
        logger.debug('Cancel schedulled operations ...')
69
        self._future.cancel()
70
71
        logger.debug('Waiting to shutdown ...')
72
        self._executor.shutdown(wait=True)
73
        self._loop.stop()
74
75
    def on_future__errors(self, future):
76
        exc = future.exception()
77
        # Unhandled errors crashes the event loop execution
78
        if isinstance(exc, BaseException):
79
            logger.critical('Fatal error caught: {!r}'.format(exc))
80
            self.stop()
81