backend.fcmbus   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 83
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 60
dl 0
loc 83
rs 10
c 0
b 0
f 0
wmc 17

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Bus.subscribe_broadcast() 0 12 1
A Bus.__init__() 0 5 2
A Bus.acknowledge() 0 3 1
A Bus.publish() 0 6 1
A Bus.broadcast() 0 6 1
A Bus.listen() 0 3 1
A Bus.subscribe() 0 9 1
A Bus.reject() 0 3 1
A Bus.close() 0 3 2
A Bus.get_queue_name() 0 6 2
A Bus.connection() 0 6 2
A Bus.sleep() 0 3 2
1
import pika
2
from helpers.queuehelper import QueueName
3
4
class Bus:
5
    _connection = None
6
    _durable = False
7
8
    def __init__(self, servicelogin):
9
        self._servicelogin = servicelogin
10
        self._userlogin = self._servicelogin.user
11
        if self._userlogin is None:
12
            self._userlogin = 'fullcycle'
13
    @classmethod
14
    def get_queue_name(cls, queue_name):
15
        name_of_q = queue_name
16
        if isinstance(queue_name, QueueName):
17
            name_of_q = QueueName.value(queue_name)
18
        return name_of_q
19
20
    def connection(self):
21
        if not self._connection:
22
            credentials = pika.PlainCredentials(self._userlogin, self._servicelogin.password)
23
            parameters = pika.ConnectionParameters(self._servicelogin.host, self._servicelogin.port, '/', credentials)
24
            self._connection = pika.BlockingConnection(parameters)
25
        return self._connection
26
27
    def close(self):
28
        if self._connection:
29
            self._connection.close()
30
31
    def sleep(self, seconds):
32
        if self._connection:
33
            self._connection.sleep(seconds)
34
35
    def publish(self, queue_name, msg, exchange=''):
36
        """Publishes message on new channel"""
37
        localchannel = self.connection().channel()
38
        localchannel.queue_declare(queue=Bus.get_queue_name(queue_name), durable=self._durable)
39
        localchannel.basic_publish(exchange=exchange, routing_key=Bus.get_queue_name(queue_name), body=msg)
40
        localchannel.close()
41
42
    def broadcast(self, exchange_name, msg):
43
        '''broadcast a message to anyone that is listening'''
44
        localchannel = self.connection().channel()
45
        localchannel.exchange_declare(exchange=Bus.get_queue_name(exchange_name), exchange_type='fanout')
46
        localchannel.basic_publish(exchange=Bus.get_queue_name(exchange_name), routing_key='', body=msg)
47
        localchannel.close()
48
49
    def subscribe(self, name, callback, no_acknowledge=True, prefetch_count=1):
50
        """basic subscribe messages from one queue
51
        remember to listen to channel to get messages
52
        """
53
        localchannel = self.connection().channel()
54
        localchannel.queue_declare(queue=Bus.get_queue_name(name))
55
        localchannel.basic_qos(prefetch_count=prefetch_count)
56
        localchannel.basic_consume(callback, queue=Bus.get_queue_name(name), no_ack=no_acknowledge)
57
        return localchannel
58
59
    def subscribe_broadcast(self, name, callback, no_acknowledge=True, prefetch_count=1):
60
        """Consumes messages from one queue"""
61
        localchannel = self.connection().channel()
62
        localchannel.exchange_declare(exchange=Bus.get_queue_name(name), exchange_type='fanout')
63
64
        result = localchannel.queue_declare(exclusive=True)
65
        queue_name = result.method.queue
66
        localchannel.queue_bind(exchange=Bus.get_queue_name(name), queue=queue_name)
67
68
        localchannel.basic_qos(prefetch_count=prefetch_count)
69
        localchannel.basic_consume(callback, queue=queue_name, no_ack=no_acknowledge)
70
        return localchannel
71
72
    def listen(self, channel):
73
        '''listen to queue. this is a blocking call'''
74
        channel.start_consuming()
75
76
    def acknowledge(self, channel, delivery_tag):
77
        '''acknowledge that message was processed'''
78
        channel.basic_ack(delivery_tag)
79
80
    def reject(self, channel, delivery_tag):
81
        '''tell queue that message was not processed'''
82
        channel.basic_nack(delivery_tag)
83