Unidata /
python-awips
| 1 | # |
||
| 2 | # Provides a Python-based interface for subscribing to qpid queues and topics. |
||
| 3 | # |
||
| 4 | # |
||
| 5 | # |
||
| 6 | # SOFTWARE HISTORY |
||
| 7 | # |
||
| 8 | # Date Ticket# Engineer Description |
||
| 9 | # ------------ ---------- ----------- -------------------------- |
||
| 10 | # 11/17/10 njensen Initial Creation. |
||
| 11 | # 08/15/13 2169 bkowal Optionally gzip decompress any data that is read. |
||
| 12 | # 08/04/16 2416 tgurney Add queueStarted property |
||
| 13 | # 02/16/17 6084 bsteffen Support ssl connections |
||
| 14 | # 09/07/17 6175 tgurney Remove "decompressing" log message |
||
| 15 | # |
||
| 16 | # |
||
| 17 | |||
| 18 | import os |
||
| 19 | import os.path |
||
| 20 | import qpid |
||
| 21 | import zlib |
||
| 22 | |||
| 23 | from Queue import Empty |
||
| 24 | from qpid.exceptions import Closed |
||
| 25 | |||
| 26 | |||
| 27 | View Code Duplication | class QpidSubscriber: |
|
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
| 28 | |||
| 29 | def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None): |
||
| 30 | self.host = host |
||
| 31 | self.port = port |
||
| 32 | self.decompress = decompress |
||
| 33 | socket = qpid.util.connect(host, port) |
||
| 34 | if "QPID_SSL_CERT_DB" in os.environ: |
||
| 35 | certdb = os.environ["QPID_SSL_CERT_DB"] |
||
| 36 | else: |
||
| 37 | certdb = os.path.expanduser("~/.qpid/") |
||
| 38 | if "QPID_SSL_CERT_NAME" in os.environ: |
||
| 39 | certname = os.environ["QPID_SSL_CERT_NAME"] |
||
| 40 | else: |
||
| 41 | certname = "guest" |
||
| 42 | certfile = os.path.join(certdb, certname + ".crt") |
||
| 43 | if ssl or (ssl is None and os.path.exists(certfile)): |
||
| 44 | keyfile = os.path.join(certdb, certname + ".key") |
||
| 45 | trustfile = os.path.join(certdb, "root.crt") |
||
| 46 | socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile) |
||
| 47 | self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest') |
||
| 48 | self.__connection.start() |
||
| 49 | self.__session = self.__connection.session(str(qpid.datatypes.uuid4())) |
||
| 50 | self.subscribed = True |
||
| 51 | self.__queueStarted = False |
||
| 52 | |||
| 53 | def topicSubscribe(self, topicName, callback): |
||
| 54 | # if the queue is edex.alerts, set decompress to true always for now to |
||
| 55 | # maintain compatibility with existing python scripts. |
||
| 56 | if topicName == 'edex.alerts': |
||
| 57 | self.decompress = True |
||
| 58 | |||
| 59 | print("Establishing connection to broker on", self.host) |
||
| 60 | queueName = topicName + self.__session.name |
||
| 61 | self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True, |
||
| 62 | arguments={'qpid.max_count': 100, 'qpid.policy_type': 'ring'}) |
||
| 63 | self.__session.exchange_bind(exchange='amq.topic', queue=queueName, binding_key=topicName) |
||
| 64 | self.__innerSubscribe(queueName, callback) |
||
| 65 | |||
| 66 | def __innerSubscribe(self, serverQueueName, callback): |
||
| 67 | local_queue_name = 'local_queue_' + serverQueueName |
||
| 68 | queue = self.__session.incoming(local_queue_name) |
||
| 69 | self.__session.message_subscribe(serverQueueName, destination=local_queue_name) |
||
| 70 | queue.start() |
||
| 71 | print("Connection complete to broker on", self.host) |
||
| 72 | self.__queueStarted = True |
||
| 73 | |||
| 74 | while self.subscribed: |
||
| 75 | try: |
||
| 76 | message = queue.get(timeout=10) |
||
| 77 | content = message.body |
||
| 78 | self.__session.message_accept(qpid.datatypes.RangedSet(message.id)) |
||
| 79 | if self.decompress: |
||
| 80 | try: |
||
| 81 | # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk |
||
| 82 | d = zlib.decompressobj(16+zlib.MAX_WBITS) |
||
| 83 | content = d.decompress(content) |
||
| 84 | except ValueError: |
||
| 85 | # decompression failed, return the original content |
||
| 86 | pass |
||
| 87 | callback(content) |
||
| 88 | except Empty: |
||
| 89 | pass |
||
| 90 | except Closed: |
||
| 91 | self.close() |
||
| 92 | |||
| 93 | def close(self): |
||
| 94 | self.__queueStarted = False |
||
| 95 | self.subscribed = False |
||
| 96 | try: |
||
| 97 | self.__session.close(timeout=10) |
||
| 98 | except ValueError: |
||
| 99 | pass |
||
| 100 | |||
| 101 | @property |
||
| 102 | def queueStarted(self): |
||
| 103 | return self.__queueStarted |
||
| 104 |