Completed
Push — master ( 43cb7b...7de6a0 )
by Dave
02:10
created

backend.fcmbus.Bus.sleep()   A

Complexity

Conditions 2

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
import pika
2
from helpers.queuehelper import QueueName
3
4
class Bus:
5
    _connection = None
6
    _durable = False
7
8
    def __init__(self, servicelogin):
9
        self._servicelogin = servicelogin
10
        self._userlogin = self._servicelogin.user
11
        if self._userlogin is None:
12
            self._userlogin = 'fullcycle'
13
14
    def connection(self):
15
        if not self._connection:
16
            credentials = pika.PlainCredentials(self._userlogin, self._servicelogin.password)
17
            parameters = pika.ConnectionParameters(self._servicelogin.host, self._servicelogin.port, '/', credentials)
18
            self._connection = pika.BlockingConnection(parameters)
19
        return self._connection
20
21
    def close(self):
22
        if self._connection:
23
            self._connection.close()
24
25
    def sleep(self, seconds):
26
        if self._connection:
27
            self._connection.sleep(seconds)
28
29
    def get_queue_name(self, queue_name):
30
        name_of_q = queue_name
31
        if isinstance(queue_name, QueueName):
32
            name_of_q = QueueName.value(queue_name)
33
        return name_of_q
34
35
    def publish(self, queue_name, msg, exchange=''):
36
        """Publishes message on new channel"""
37
        localchannel = self.connection().channel()
38
        localchannel.queue_declare(queue=self.get_queue_name(queue_name), durable=self._durable)
39
        localchannel.basic_publish(exchange=exchange, routing_key=self.get_queue_name(queue_name), body=msg)
40
        localchannel.close()
41
42
    def broadcast(self, exchange_name, msg):
43
        '''broadcast a message to anyone that is listening'''
44
        localchannel = self.connection().channel()
45
        localchannel.exchange_declare(exchange=self.get_queue_name(exchange_name), exchange_type='fanout')
46
        localchannel.basic_publish(exchange=self.get_queue_name(exchange_name), routing_key='', body=msg)
47
        localchannel.close()
48
49
    def subscribe(self, name, callback, no_acknowledge=True, prefetch_count=1):
50
        """basic subscribe messages from one queue
51
        remember to listen to channel to get messages
52
        """
53
        localchannel = self.connection().channel()
54
        localchannel.queue_declare(queue=self.get_queue_name(name))
55
        localchannel.basic_qos(prefetch_count=prefetch_count)
56
        localchannel.basic_consume(callback, queue=self.get_queue_name(name), no_ack=no_acknowledge)
57
        return localchannel
58
59
    def subscribe_broadcast(self, name, callback, no_acknowledge=True, prefetch_count=1):
60
        """Consumes messages from one queue"""
61
        localchannel = self.connection().channel()
62
        localchannel.exchange_declare(exchange=self.get_queue_name(name), exchange_type='fanout')
63
64
        result = localchannel.queue_declare(exclusive=True)
65
        queue_name = result.method.queue
66
        localchannel.queue_bind(exchange=self.get_queue_name(name), queue=queue_name)
67
68
        localchannel.basic_qos(prefetch_count=prefetch_count)
69
        localchannel.basic_consume(callback, queue=queue_name, no_ack=no_acknowledge)
70
        return localchannel
71
72
    def listen(self, channel):
73
        '''listen to queue. this is a blocking call'''
74
        channel.start_consuming()
75
76
    def acknowledge(self, channel, delivery_tag):
77
        '''acknowledge that message was processed'''
78
        channel.basic_ack(delivery_tag)
79
80
    def reject(self, channel, delivery_tag):
81
        '''tell queue that message was not processed'''
82
        channel.basic_nack(delivery_tag)
83
84