|
1
|
|
|
# |
|
2
|
|
|
# Provides a Python-based interface for subscribing to qpid queues and topics. |
|
3
|
|
|
# |
|
4
|
|
|
# |
|
5
|
|
|
# |
|
6
|
|
|
# SOFTWARE HISTORY |
|
7
|
|
|
# |
|
8
|
|
|
# Date Ticket# Engineer Description |
|
9
|
|
|
# ------------ ---------- ----------- -------------------------- |
|
10
|
|
|
# 11/17/10 njensen Initial Creation. |
|
11
|
|
|
# 08/15/13 2169 bkowal Optionally gzip decompress any data that is read. |
|
12
|
|
|
# 08/04/16 2416 tgurney Add queueStarted property |
|
13
|
|
|
# 02/16/17 6084 bsteffen Support ssl connections |
|
14
|
|
|
# 09/07/17 6175 tgurney Remove "decompressing" log message |
|
15
|
|
|
# |
|
16
|
|
|
# |
|
17
|
|
|
|
|
18
|
|
|
import os |
|
19
|
|
|
import os.path |
|
20
|
|
|
import qpid |
|
21
|
|
|
import zlib |
|
22
|
|
|
|
|
23
|
|
|
from Queue import Empty |
|
24
|
|
|
from qpid.exceptions import Closed |
|
25
|
|
|
|
|
26
|
|
|
|
|
27
|
|
View Code Duplication |
class QpidSubscriber: |
|
|
|
|
|
|
28
|
|
|
|
|
29
|
|
|
def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None): |
|
30
|
|
|
self.host = host |
|
31
|
|
|
self.port = port |
|
32
|
|
|
self.decompress = decompress |
|
33
|
|
|
socket = qpid.util.connect(host, port) |
|
34
|
|
|
if "QPID_SSL_CERT_DB" in os.environ: |
|
35
|
|
|
certdb = os.environ["QPID_SSL_CERT_DB"] |
|
36
|
|
|
else: |
|
37
|
|
|
certdb = os.path.expanduser("~/.qpid/") |
|
38
|
|
|
if "QPID_SSL_CERT_NAME" in os.environ: |
|
39
|
|
|
certname = os.environ["QPID_SSL_CERT_NAME"] |
|
40
|
|
|
else: |
|
41
|
|
|
certname = "guest" |
|
42
|
|
|
certfile = os.path.join(certdb, certname + ".crt") |
|
43
|
|
|
if ssl or (ssl is None and os.path.exists(certfile)): |
|
44
|
|
|
keyfile = os.path.join(certdb, certname + ".key") |
|
45
|
|
|
trustfile = os.path.join(certdb, "root.crt") |
|
46
|
|
|
socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile) |
|
47
|
|
|
self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest') |
|
48
|
|
|
self.__connection.start() |
|
49
|
|
|
self.__session = self.__connection.session(str(qpid.datatypes.uuid4())) |
|
50
|
|
|
self.subscribed = True |
|
51
|
|
|
self.__queueStarted = False |
|
52
|
|
|
|
|
53
|
|
|
def topicSubscribe(self, topicName, callback): |
|
54
|
|
|
# if the queue is edex.alerts, set decompress to true always for now to |
|
55
|
|
|
# maintain compatibility with existing python scripts. |
|
56
|
|
|
if topicName == 'edex.alerts': |
|
57
|
|
|
self.decompress = True |
|
58
|
|
|
|
|
59
|
|
|
print("Establishing connection to broker on", self.host) |
|
60
|
|
|
queueName = topicName + self.__session.name |
|
61
|
|
|
self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, |
|
62
|
|
|
arguments={'qpid.max_count': 100, 'qpid.policy_type': 'ring'}) |
|
63
|
|
|
self.__session.exchange_bind(exchange='amq.topic', queue=queueName, binding_key=topicName) |
|
64
|
|
|
self.__innerSubscribe(queueName, callback) |
|
65
|
|
|
|
|
66
|
|
|
def __innerSubscribe(self, serverQueueName, callback): |
|
67
|
|
|
local_queue_name = 'local_queue_' + serverQueueName |
|
68
|
|
|
queue = self.__session.incoming(local_queue_name) |
|
69
|
|
|
self.__session.message_subscribe(serverQueueName, destination=local_queue_name) |
|
70
|
|
|
queue.start() |
|
71
|
|
|
print("Connection complete to broker on", self.host) |
|
72
|
|
|
self.__queueStarted = True |
|
73
|
|
|
|
|
74
|
|
|
while self.subscribed: |
|
75
|
|
|
try: |
|
76
|
|
|
message = queue.get(timeout=10) |
|
77
|
|
|
content = message.body |
|
78
|
|
|
self.__session.message_accept(qpid.datatypes.RangedSet(message.id)) |
|
79
|
|
|
if self.decompress: |
|
80
|
|
|
try: |
|
81
|
|
|
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk |
|
82
|
|
|
d = zlib.decompressobj(16+zlib.MAX_WBITS) |
|
83
|
|
|
content = d.decompress(content) |
|
84
|
|
|
except ValueError: |
|
85
|
|
|
# decompression failed, return the original content |
|
86
|
|
|
pass |
|
87
|
|
|
callback(content) |
|
88
|
|
|
except Empty: |
|
89
|
|
|
pass |
|
90
|
|
|
except Closed: |
|
91
|
|
|
self.close() |
|
92
|
|
|
|
|
93
|
|
|
def close(self): |
|
94
|
|
|
self.__queueStarted = False |
|
95
|
|
|
self.subscribed = False |
|
96
|
|
|
try: |
|
97
|
|
|
self.__session.close(timeout=10) |
|
98
|
|
|
except ValueError: |
|
99
|
|
|
pass |
|
100
|
|
|
|
|
101
|
|
|
@property |
|
102
|
|
|
def queueStarted(self): |
|
103
|
|
|
return self.__queueStarted |
|
104
|
|
|
|