helpers.queuehelper   A
last analyzed

Complexity

Total Complexity 38

Size/Duplication

Total Lines 223
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 144
dl 0
loc 223
rs 9.36
c 0
b 0
f 0
wmc 38

32 Methods

Rating   Name   Duplication   Size   Complexity  
A Queue.publish_channel() 0 5 1
A BroadcastListener.initialize() 0 6 1
A Queue.setupchannel() 0 4 1
A Queue.getmessage() 0 6 2
A Queue.publish() 0 4 2
A BroadcastSender.initialize() 0 4 1
A BroadcastBase.setupbroadcast() 0 3 1
A QueueName.value() 0 3 1
A Queue.connection() 0 2 1
A QueueEntries.__init__() 0 2 1
A Queue.initialize() 0 4 1
A QueueEntries.addbroadcast() 0 2 1
A BroadcastBase.setupexchange() 0 6 1
A Queue.declare() 0 3 1
A Queue.subscribe() 0 4 1
A QueueEntries.addalert() 0 2 1
A Queue.listen() 0 3 1
A QueueName.has_value() 0 3 1
A BroadcastBase.initialize() 0 6 1
A BroadcastListener.declarechannel() 0 5 1
A Queue.sleep() 0 3 1
A QueueEntry.__init__() 0 4 1
A Queue.broadcast_channel() 0 5 1
A QueueEntries.add() 0 2 1
A Queue.acknowledge() 0 3 1
A Queue.reject() 0 3 1
A Queue.getparameters() 0 4 1
A Queue.__init__() 0 7 2
A QueueEntries.hasentries() 0 4 2
A BroadcastSender.broadcast() 0 4 2
A Queue.close() 0 7 2
A QueueName.__str__() 0 2 1
1
'''messagebug (queue) related functions'''
2
from enum import Enum
3
import pika
4
5
#class NoValue(Enum):
6
#    def __repr__(self):
7
#        return '<%s.%s>' % (self.__class__.__name__, self.name)
8
9
class QueueName(Enum):
10
    """Known standard queue names
11
    You may be able to define and use other queue names
12
    but you're on your own
13
    """
14
    Q_DUMMY = 'dummy'
15
    Q_COMPONENT = 'component'
16
    Q_LOG = 'log'
17
    Q_PROVISION = 'provision'
18
    Q_SWITCH = 'switch'
19
    Q_RESTART = 'restart'
20
    Q_ALERT = 'alert'
21
    Q_DISCOVER = 'discover'
22
    Q_DISCOVERED = 'discovered'
23
    Q_MONITOR = 'monitor'
24
    Q_MONITORMINER = 'monitorminer'
25
    Q_SHUTDOWN = 'shutdown'
26
    Q_OFFLINE = 'offline'
27
    Q_ONLINE = 'online'
28
    Q_STATISTICSUPDATED = 'statisticsupdated'
29
    Q_POOLCONFIGURATIONCHANGED = 'poolconfigurationchanged'
30
    Q_CLOUDPULL = 'cloudpull'
31
    Q_EMAIL = 'email'
32
    Q_SENSOR = 'sensor'
33
    Q_UPDATEWEB = 'updateweb'
34
    Q_SAVE = 'save'
35
36
    @classmethod
37
    def value(cls, queue_name):
38
        return queue_name._name_.lower()[2:]
39
40
    @classmethod
41
    def has_value(cls, queue_name):
42
        return any(queue_name == item.value for item in cls)
43
44
    def __str__(self):
45
        return "%s" % (self._name_.lower())
46
47
class QueueType:
48
    broadcast = 'broadcast'
49
    publish = 'publish'
50
51
52
class QueueEntry(object):
53
    '''An entry that will go into a queue'''
54
    def __init__(self, queuename, message, eventtype=QueueType.publish):
55
        self.queuename = queuename
56
        self.eventtype = eventtype
57
        self.message = message
58
59
60
class QueueEntries(object):
61
    '''a list of queue entries'''
62
    def __init__(self):
63
        self.entries = []
64
65
    def add(self, queuename, message):
66
        self.entries.append(QueueEntry(queuename, message, QueueType.publish))
67
68
    def addbroadcast(self, queuename, message):
69
        self.entries.append(QueueEntry(queuename, message, QueueType.broadcast))
70
71
    def addalert(self, message):
72
        self.addbroadcast(QueueName.Q_ALERT, message)
73
74
    def hasentries(self):
75
        if self.entries is None:
76
            return 0
77
        return len(self.entries)
78
79
80
class Queue:
81
    """
82
    Wrapper around a rabbitmq queue
83
    """
84
    queue_name = None
85
    _connection = None
86
    channel = None
87
    state = None
88
    _userlogin = None
89
90
    def __init__(self, queueName, servicelogin):
91
        self.queue_name = queueName
92
        self._servicelogin = servicelogin
93
        self._userlogin = self._servicelogin.user
94
        if self._userlogin is None:
95
            self._userlogin = 'fullcycle'
96
        self.initialize(queueName)
97
98
    def connection(self):
99
        return self._connection
100
101
    def getparameters(self):
102
        credentials = pika.PlainCredentials(self._userlogin, self._servicelogin.password)
103
        parameters = pika.ConnectionParameters(self._servicelogin.host, self._servicelogin.port, '/', credentials)
104
        return parameters
105
106
    def initialize(self, name):
107
        '''init'''
108
        self.setupchannel()
109
        self.declare(name)
110
111
    def setupchannel(self):
112
        '''create the channel. also creates connection'''
113
        self._connection = pika.BlockingConnection(self.getparameters())
114
        self.channel = self._connection.channel()
115
116
    def declare(self, name):
117
        """Creates the queue"""
118
        self.state = self.channel.queue_declare(queue=name)
119
120
    def publish(self, msg, exchange=''):
121
        """Publishes message for one consumer"""
122
        if self.channel != None:
123
            self.channel.basic_publish(exchange=exchange, routing_key=self.queue_name, body=msg)
124
125
    def publish_channel(self, queue_name, msg, exchange=''):
126
        """Publishes message on new channel"""
127
        localchannel = self._connection.channel()
128
        localchannel.basic_publish(exchange=exchange, routing_key=queue_name, body=msg)
129
        localchannel.close()
130
131
    def broadcast_channel(self, exchange_name, msg):
132
        '''broadcast a message to anyone that is listening'''
133
        localchannel = self._connection.channel()
134
        localchannel.basic_publish(exchange=exchange_name, routing_key='', body=msg)
135
        localchannel.close()
136
137
    def subscribe(self, callback, no_acknowledge=True, prefetch_count=1):
138
        """Consumes messages from one queue"""
139
        self.channel.basic_qos(prefetch_count=prefetch_count)
140
        self.channel.basic_consume(callback, queue=self.queue_name, no_ack=no_acknowledge)
141
142
    def listen(self):
143
        '''listen to queue'''
144
        self.channel.start_consuming()
145
146
    def sleep(self, duration):
147
        '''call this periodically so the connection does not time out'''
148
        self._connection.sleep(duration)
149
150
    def getmessage(self, no_acknowledge=True):
151
        """if not listening then you can get message in a loop"""
152
        queue_empty = self.state.method.message_count == 0
153
        if not queue_empty:
154
            return self.channel.basic_get(self.queue_name, no_ack=no_acknowledge)
155
        return (None, None, None)
156
157
    def acknowledge(self, delivery_tag):
158
        '''acknowledge that message was processed'''
159
        self.channel.basic_ack(delivery_tag)
160
161
    def reject(self, delivery_tag):
162
        '''tell queue that message was not processed'''
163
        self.channel.basic_nack(delivery_tag)
164
165
    def close(self):
166
        """close the queue"""
167
        if self.channel:
168
            self.channel.close()
169
            self.channel = None
170
            self._connection = None
171
            self.state = None
172
173
class BroadcastBase(Queue):
174
    '''a queue for broadcasting messages to multiple recipients
175
    todo: should be abstract'''
176
    _exchangename = None
177
    _exchangetype = None
178
179
    def initialize(self, name):
180
        '''init'''
181
        self.setupchannel()
182
        self.setupbroadcast(name)
183
        #do not need to declare when sending!
184
        self.declare(name)
185
186
    def setupexchange(self, name, exchange_type):
187
        '''set up the exchange for broadcasting'''
188
        self._exchangename = name
189
        self._exchangetype = exchange_type
190
        self.channel.exchange_declare(exchange=name, exchange_type=exchange_type)
191
        return self
192
193
    def setupbroadcast(self, name):
194
        '''set up the exchange for broadcasting'''
195
        return self.setupexchange(name, 'fanout')
196
197
class BroadcastSender(BroadcastBase):
198
    def initialize(self, name):
199
        '''init'''
200
        self.setupchannel()
201
        self.setupbroadcast(name)
202
        #no declare when sending
203
204
    def broadcast(self, msg):
205
        '''broadcast a message to anyone that is listening'''
206
        if self.channel != None:
207
            self.channel.basic_publish(exchange=self._exchangename, routing_key='', body=msg)
208
209
class BroadcastListener(BroadcastBase):
210
211
    def initialize(self, name):
212
        '''init'''
213
        self.setupchannel()
214
        self.setupbroadcast(name)
215
        #only need to declare channel when listening
216
        self.declarechannel()
217
218
    def declarechannel(self):
219
        '''declare the channel'''
220
        result = self.channel.queue_declare(exclusive=True)
221
        self.queue_name = result.method.queue
222
        self.channel.queue_bind(exchange=self._exchangename, queue=self.queue_name)
223