|
1
|
|
|
import pika |
|
2
|
|
|
from helpers.queuehelper import QueueName |
|
3
|
|
|
|
|
4
|
|
|
class Bus: |
|
5
|
|
|
_connection = None |
|
6
|
|
|
_durable = False |
|
7
|
|
|
|
|
8
|
|
|
def __init__(self, servicelogin): |
|
9
|
|
|
self._servicelogin = servicelogin |
|
10
|
|
|
self._userlogin = self._servicelogin.user |
|
11
|
|
|
if self._userlogin is None: |
|
12
|
|
|
self._userlogin = 'fullcycle' |
|
13
|
|
|
@classmethod |
|
14
|
|
|
def get_queue_name(cls, queue_name): |
|
15
|
|
|
name_of_q = queue_name |
|
16
|
|
|
if isinstance(queue_name, QueueName): |
|
17
|
|
|
name_of_q = QueueName.value(queue_name) |
|
18
|
|
|
return name_of_q |
|
19
|
|
|
|
|
20
|
|
|
def connection(self): |
|
21
|
|
|
if not self._connection: |
|
22
|
|
|
credentials = pika.PlainCredentials(self._userlogin, self._servicelogin.password) |
|
23
|
|
|
parameters = pika.ConnectionParameters(self._servicelogin.host, self._servicelogin.port, '/', credentials) |
|
24
|
|
|
self._connection = pika.BlockingConnection(parameters) |
|
25
|
|
|
return self._connection |
|
26
|
|
|
|
|
27
|
|
|
def close(self): |
|
28
|
|
|
if self._connection: |
|
29
|
|
|
self._connection.close() |
|
30
|
|
|
|
|
31
|
|
|
def sleep(self, seconds): |
|
32
|
|
|
if self._connection: |
|
33
|
|
|
self._connection.sleep(seconds) |
|
34
|
|
|
|
|
35
|
|
|
def publish(self, queue_name, msg, exchange=''): |
|
36
|
|
|
"""Publishes message on new channel""" |
|
37
|
|
|
localchannel = self.connection().channel() |
|
38
|
|
|
localchannel.queue_declare(queue=Bus.get_queue_name(queue_name), durable=self._durable) |
|
39
|
|
|
localchannel.basic_publish(exchange=exchange, routing_key=Bus.get_queue_name(queue_name), body=msg) |
|
40
|
|
|
localchannel.close() |
|
41
|
|
|
|
|
42
|
|
|
def broadcast(self, exchange_name, msg): |
|
43
|
|
|
'''broadcast a message to anyone that is listening''' |
|
44
|
|
|
localchannel = self.connection().channel() |
|
45
|
|
|
localchannel.exchange_declare(exchange=Bus.get_queue_name(exchange_name), exchange_type='fanout') |
|
46
|
|
|
localchannel.basic_publish(exchange=Bus.get_queue_name(exchange_name), routing_key='', body=msg) |
|
47
|
|
|
localchannel.close() |
|
48
|
|
|
|
|
49
|
|
|
def subscribe(self, name, callback, no_acknowledge=True, prefetch_count=1): |
|
50
|
|
|
"""basic subscribe messages from one queue |
|
51
|
|
|
remember to listen to channel to get messages |
|
52
|
|
|
""" |
|
53
|
|
|
localchannel = self.connection().channel() |
|
54
|
|
|
localchannel.queue_declare(queue=Bus.get_queue_name(name)) |
|
55
|
|
|
localchannel.basic_qos(prefetch_count=prefetch_count) |
|
56
|
|
|
localchannel.basic_consume(callback, queue=Bus.get_queue_name(name), no_ack=no_acknowledge) |
|
57
|
|
|
return localchannel |
|
58
|
|
|
|
|
59
|
|
|
def subscribe_broadcast(self, name, callback, no_acknowledge=True, prefetch_count=1): |
|
60
|
|
|
"""Consumes messages from one queue""" |
|
61
|
|
|
localchannel = self.connection().channel() |
|
62
|
|
|
localchannel.exchange_declare(exchange=Bus.get_queue_name(name), exchange_type='fanout') |
|
63
|
|
|
|
|
64
|
|
|
result = localchannel.queue_declare(exclusive=True) |
|
65
|
|
|
queue_name = result.method.queue |
|
66
|
|
|
localchannel.queue_bind(exchange=Bus.get_queue_name(name), queue=queue_name) |
|
67
|
|
|
|
|
68
|
|
|
localchannel.basic_qos(prefetch_count=prefetch_count) |
|
69
|
|
|
localchannel.basic_consume(callback, queue=queue_name, no_ack=no_acknowledge) |
|
70
|
|
|
return localchannel |
|
71
|
|
|
|
|
72
|
|
|
def listen(self, channel): |
|
73
|
|
|
'''listen to queue. this is a blocking call''' |
|
74
|
|
|
channel.start_consuming() |
|
75
|
|
|
|
|
76
|
|
|
def acknowledge(self, channel, delivery_tag): |
|
77
|
|
|
'''acknowledge that message was processed''' |
|
78
|
|
|
channel.basic_ack(delivery_tag) |
|
79
|
|
|
|
|
80
|
|
|
def reject(self, channel, delivery_tag): |
|
81
|
|
|
'''tell queue that message was not processed''' |
|
82
|
|
|
channel.basic_nack(delivery_tag) |
|
83
|
|
|
|