1
|
|
|
import pika |
2
|
|
|
from helpers.queuehelper import QueueName |
3
|
|
|
|
4
|
|
|
class Bus: |
5
|
|
|
_connection = None |
6
|
|
|
_durable = False |
7
|
|
|
|
8
|
|
|
def __init__(self, servicelogin): |
9
|
|
|
self._servicelogin = servicelogin |
10
|
|
|
self._userlogin = self._servicelogin.user |
11
|
|
|
if self._userlogin is None: |
12
|
|
|
self._userlogin = 'fullcycle' |
13
|
|
|
|
14
|
|
|
def connection(self): |
15
|
|
|
if not self._connection: |
16
|
|
|
credentials = pika.PlainCredentials(self._userlogin, self._servicelogin.password) |
17
|
|
|
parameters = pika.ConnectionParameters(self._servicelogin.host, self._servicelogin.port, '/', credentials) |
18
|
|
|
self._connection = pika.BlockingConnection(parameters) |
19
|
|
|
return self._connection |
20
|
|
|
|
21
|
|
|
def close(self): |
22
|
|
|
if self._connection: |
23
|
|
|
self._connection.close() |
24
|
|
|
|
25
|
|
|
def sleep(self, seconds): |
26
|
|
|
if self._connection: |
27
|
|
|
self._connection.sleep(seconds) |
28
|
|
|
|
29
|
|
|
def get_queue_name(self, queue_name): |
30
|
|
|
name_of_q = queue_name |
31
|
|
|
if isinstance(queue_name, QueueName): |
32
|
|
|
name_of_q = QueueName.value(queue_name) |
33
|
|
|
return name_of_q |
34
|
|
|
|
35
|
|
|
def publish(self, queue_name, msg, exchange=''): |
36
|
|
|
"""Publishes message on new channel""" |
37
|
|
|
localchannel = self.connection().channel() |
38
|
|
|
localchannel.queue_declare(queue=self.get_queue_name(queue_name), durable=self._durable) |
39
|
|
|
localchannel.basic_publish(exchange=exchange, routing_key=self.get_queue_name(queue_name), body=msg) |
40
|
|
|
localchannel.close() |
41
|
|
|
|
42
|
|
|
def broadcast(self, exchange_name, msg): |
43
|
|
|
'''broadcast a message to anyone that is listening''' |
44
|
|
|
localchannel = self.connection().channel() |
45
|
|
|
localchannel.exchange_declare(exchange=self.get_queue_name(exchange_name), exchange_type='fanout') |
46
|
|
|
localchannel.basic_publish(exchange=self.get_queue_name(exchange_name), routing_key='', body=msg) |
47
|
|
|
localchannel.close() |
48
|
|
|
|
49
|
|
|
def subscribe(self, name, callback, no_acknowledge=True, prefetch_count=1): |
50
|
|
|
"""basic subscribe messages from one queue |
51
|
|
|
remember to listen to channel to get messages |
52
|
|
|
""" |
53
|
|
|
localchannel = self.connection().channel() |
54
|
|
|
localchannel.queue_declare(queue=self.get_queue_name(name)) |
55
|
|
|
localchannel.basic_qos(prefetch_count=prefetch_count) |
56
|
|
|
localchannel.basic_consume(callback, queue=self.get_queue_name(name), no_ack=no_acknowledge) |
57
|
|
|
return localchannel |
58
|
|
|
|
59
|
|
|
def subscribe_broadcast(self, name, callback, no_acknowledge=True, prefetch_count=1): |
60
|
|
|
"""Consumes messages from one queue""" |
61
|
|
|
localchannel = self.connection().channel() |
62
|
|
|
localchannel.exchange_declare(exchange=self.get_queue_name(name), exchange_type='fanout') |
63
|
|
|
|
64
|
|
|
result = localchannel.queue_declare(exclusive=True) |
65
|
|
|
queue_name = result.method.queue |
66
|
|
|
localchannel.queue_bind(exchange=self.get_queue_name(name), queue=queue_name) |
67
|
|
|
|
68
|
|
|
localchannel.basic_qos(prefetch_count=prefetch_count) |
69
|
|
|
localchannel.basic_consume(callback, queue=queue_name, no_ack=no_acknowledge) |
70
|
|
|
return localchannel |
71
|
|
|
|
72
|
|
|
def listen(self, channel): |
73
|
|
|
'''listen to queue. this is a blocking call''' |
74
|
|
|
channel.start_consuming() |
75
|
|
|
|
76
|
|
|
def acknowledge(self, channel, delivery_tag): |
77
|
|
|
'''acknowledge that message was processed''' |
78
|
|
|
channel.basic_ack(delivery_tag) |
79
|
|
|
|
80
|
|
|
def reject(self, channel, delivery_tag): |
81
|
|
|
'''tell queue that message was not processed''' |
82
|
|
|
channel.basic_nack(delivery_tag) |
83
|
|
|
|
84
|
|
|
|