Issues (227)

awips/QpidSubscriber.py (1 issue)

1
#
2
# Provides a Python-based interface for subscribing to qpid queues and topics.
3
#
4
#
5
#
6
#     SOFTWARE HISTORY
7
#
8
#    Date            Ticket#       Engineer       Description
9
#    ------------    ----------    -----------    --------------------------
10
#    11/17/10                      njensen       Initial Creation.
11
#    08/15/13        2169          bkowal        Optionally gzip decompress any data that is read.
12
#    08/04/16        2416          tgurney       Add queueStarted property
13
#    02/16/17        6084          bsteffen      Support ssl connections
14
#    09/07/17        6175          tgurney       Remove "decompressing" log message
15
#
16
#
17
18
import os
19
import os.path
20
import qpid
21
import zlib
22
23
from Queue import Empty
24
from qpid.exceptions import Closed
25
26
27 View Code Duplication
class QpidSubscriber:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
28
29
    def __init__(self, host='127.0.0.1', port=5672, decompress=False, ssl=None):
30
        self.host = host
31
        self.port = port
32
        self.decompress = decompress
33
        socket = qpid.util.connect(host, port)
34
        if "QPID_SSL_CERT_DB" in os.environ:
35
            certdb = os.environ["QPID_SSL_CERT_DB"]
36
        else:
37
            certdb = os.path.expanduser("~/.qpid/")
38
        if "QPID_SSL_CERT_NAME" in os.environ:
39
            certname = os.environ["QPID_SSL_CERT_NAME"]
40
        else:
41
            certname = "guest"
42
        certfile = os.path.join(certdb, certname + ".crt")
43
        if ssl or (ssl is None and os.path.exists(certfile)):
44
            keyfile = os.path.join(certdb, certname + ".key")
45
            trustfile = os.path.join(certdb, "root.crt")
46
            socket = qpid.util.ssl(socket, keyfile=keyfile, certfile=certfile, ca_certs=trustfile)
47
        self.__connection = qpid.connection.Connection(sock=socket, username='guest', password='guest')
48
        self.__connection.start()
49
        self.__session = self.__connection.session(str(qpid.datatypes.uuid4()))
50
        self.subscribed = True
51
        self.__queueStarted = False
52
53
    def topicSubscribe(self, topicName, callback):
54
        # if the queue is edex.alerts, set decompress to true always for now to
55
        # maintain compatibility with existing python scripts.
56
        if topicName == 'edex.alerts':
57
            self.decompress = True
58
59
        print("Establishing connection to broker on", self.host)
60
        queueName = topicName + self.__session.name
61
        self.__session.queue_declare(queue=queueName, exclusive=True, auto_delete=True,
62
                                     arguments={'qpid.max_count': 100, 'qpid.policy_type': 'ring'})
63
        self.__session.exchange_bind(exchange='amq.topic', queue=queueName, binding_key=topicName)
64
        self.__innerSubscribe(queueName, callback)
65
66
    def __innerSubscribe(self, serverQueueName, callback):
67
        local_queue_name = 'local_queue_' + serverQueueName
68
        queue = self.__session.incoming(local_queue_name)
69
        self.__session.message_subscribe(serverQueueName, destination=local_queue_name)
70
        queue.start()
71
        print("Connection complete to broker on", self.host)
72
        self.__queueStarted = True
73
74
        while self.subscribed:
75
            try:
76
                message = queue.get(timeout=10)
77
                content = message.body
78
                self.__session.message_accept(qpid.datatypes.RangedSet(message.id))
79
                if self.decompress:
80
                    try:
81
                        # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
82
                        d = zlib.decompressobj(16+zlib.MAX_WBITS)
83
                        content = d.decompress(content)
84
                    except ValueError:
85
                        # decompression failed, return the original content
86
                        pass
87
                callback(content)
88
            except Empty:
89
                pass
90
            except Closed:
91
                self.close()
92
93
    def close(self):
94
        self.__queueStarted = False
95
        self.subscribed = False
96
        try:
97
            self.__session.close(timeout=10)
98
        except ValueError:
99
            pass
100
101
    @property
102
    def queueStarted(self):
103
        return self.__queueStarted
104