1 | # |
||
2 | # Provides a Python-based interface for subscribing to qpid queues and topics. |
||
3 | # |
||
4 | # |
||
5 | # |
||
6 | # SOFTWARE HISTORY |
||
7 | # |
||
8 | # Date Ticket# Engineer Description |
||
9 | # ------------ ---------- ----------- -------------------------- |
||
10 | # 11/17/10 njensen Initial Creation. |
||
11 | # 08/15/13 2169 bkowal Optionally gzip decompress any data that is read. |
||
12 | # 08/04/16 2416 tgurney Add queueStarted property |
||
13 | # 02/16/17 6084 bsteffen Support ssl connections |
||
14 | # 09/07/17 6175 tgurney Remove "decompressing" log message |
||
15 | # |
||
16 | # |
||
17 | |||
18 | import os |
||
19 | import os.path |
||
20 | import qpid |
||
21 | import zlib |
||
22 | |||
23 | from Queue import Empty |
||
24 | from qpid.exceptions import Closed |
||
25 | |||
26 | |||
27 | View Code Duplication | class QpidSubscriber: |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
28 | |||
29 | def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None): |
||
30 | self.host = host |
||
31 | self.port = port |
||
32 | self.decompress = decompress |
||
33 | socket = qpid.util.connect(host, port) |
||
34 | if "QPID_SSL_CERT_DB" in os.environ: |
||
35 | certdb = os.environ["QPID_SSL_CERT_DB"] |
||
36 | else: |
||
37 | certdb = os.path.expanduser("~/.qpid/") |
||
38 | if "QPID_SSL_CERT_NAME" in os.environ: |
||
39 | certname = os.environ["QPID_SSL_CERT_NAME"] |
||
40 | else: |
||
41 | certname = "guest" |
||
42 | certfile = os.path.join(certdb, certname + ".crt") |
||
43 | if ssl or (ssl is None and os.path.exists(certfile)): |
||
44 | keyfile = os.path.join(certdb, certname + ".key") |
||
45 | trustfile = os.path.join(certdb, "root.crt") |
||
46 | socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile) |
||
47 | self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest') |
||
48 | self.__connection.start() |
||
49 | self.__session = self.__connection.session(str(qpid.datatypes.uuid4())) |
||
50 | self.subscribed = True |
||
51 | self.__queueStarted = False |
||
52 | |||
53 | def topicSubscribe(self, topicName, callback): |
||
54 | # if the queue is edex.alerts, set decompress to true always for now to |
||
55 | # maintain compatibility with existing python scripts. |
||
56 | if topicName == 'edex.alerts': |
||
57 | self.decompress = True |
||
58 | |||
59 | print("Establishing connection to broker on", self.host) |
||
60 | queueName = topicName + self.__session.name |
||
61 | self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, |
||
62 | arguments={'qpid.max_count': 100, 'qpid.policy_type': 'ring'}) |
||
63 | self.__session.exchange_bind(exchange='amq.topic', queue=queueName, binding_key=topicName) |
||
64 | self.__innerSubscribe(queueName, callback) |
||
65 | |||
66 | def __innerSubscribe(self, serverQueueName, callback): |
||
67 | local_queue_name = 'local_queue_' + serverQueueName |
||
68 | queue = self.__session.incoming(local_queue_name) |
||
69 | self.__session.message_subscribe(serverQueueName, destination=local_queue_name) |
||
70 | queue.start() |
||
71 | print("Connection complete to broker on", self.host) |
||
72 | self.__queueStarted = True |
||
73 | |||
74 | while self.subscribed: |
||
75 | try: |
||
76 | message = queue.get(timeout=10) |
||
77 | content = message.body |
||
78 | self.__session.message_accept(qpid.datatypes.RangedSet(message.id)) |
||
79 | if self.decompress: |
||
80 | try: |
||
81 | # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk |
||
82 | d = zlib.decompressobj(16+zlib.MAX_WBITS) |
||
83 | content = d.decompress(content) |
||
84 | except ValueError: |
||
85 | # decompression failed, return the original content |
||
86 | pass |
||
87 | callback(content) |
||
88 | except Empty: |
||
89 | pass |
||
90 | except Closed: |
||
91 | self.close() |
||
92 | |||
93 | def close(self): |
||
94 | self.__queueStarted = False |
||
95 | self.subscribed = False |
||
96 | try: |
||
97 | self.__session.close(timeout=10) |
||
98 | except ValueError: |
||
99 | pass |
||
100 | |||
101 | @property |
||
102 | def queueStarted(self): |
||
103 | return self.__queueStarted |
||
104 |