ocrd_network.rabbitmq_utils.connector   C
last analyzed

Complexity

Total Complexity 57

Size/Duplication

Total Lines 282
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 57
eloc 202
dl 0
loc 282
rs 5.04
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A RMQConnector.open_blocking_channel() 0 6 3
A RMQConnector._authenticate_and_connect() 0 11 3
A RMQConnector.confirm_delivery() 0 6 3
A RMQConnector.create_queue() 0 9 1
A RMQConnector.queue_unbind() 0 15 4
A RMQConnector.exchange_declare() 0 24 4
A RMQConnector.queue_bind() 0 10 4
A RMQConnector.exchange_unbind() 0 16 4
A RMQConnector.queue_declare() 0 24 4
A RMQConnector.queue_purge() 0 4 3
A RMQConnector.exchange_bind() 0 16 4
A RMQConnector.basic_publish() 0 13 3
A RMQConnector.exchange_delete() 0 7 3
A RMQConnector.queue_delete() 0 14 3
A RMQConnector.set_qos() 0 14 3
A RMQConnector.declare_and_bind_defaults() 0 18 5
A RMQConnector.__init__() 0 15 1
A RMQConnector.open_blocking_connection() 0 15 1
A RMQConnector.close_connection() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.rabbitmq_utils.connector often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
The source code in this file is adapted by reusing
3
some part of the source code from the official
4
RabbitMQ documentation.
5
"""
6
from typing import Any, Optional, Union
7
from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials
8
from pika.adapters.blocking_connection import BlockingChannel
9
from ocrd_utils import config
10
from .constants import (
11
    DEFAULT_EXCHANGER_NAME,
12
    DEFAULT_EXCHANGER_TYPE,
13
    DEFAULT_QUEUE,
14
    DEFAULT_ROUTER,
15
    RABBIT_MQ_HOST,
16
    RABBIT_MQ_PORT,
17
    RABBIT_MQ_VHOST,
18
    PREFETCH_COUNT
19
)
20
21
22
class RMQConnector:
23
    def __init__(self, host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST) -> None:
24
        self._host = host
25
        self._port = port
26
        self._vhost = vhost
27
28
        # According to the documentation, Pika blocking
29
        # connections are not thread-safe!
30
        self._connection = None
31
        self._channel = None
32
33
        # Should try reconnecting again
34
        self._try_reconnecting = False
35
        # If the module has been stopped with a
36
        # keyboard interruption, i.e., CTRL + C
37
        self._gracefully_stopped = False
38
39
    def close_connection(self, reply_code: int = 200, reply_text: str = "Normal shutdown"):
40
        self._connection.close(reply_code=reply_code, reply_text=reply_text)
41
42
    @staticmethod
43
    def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingChannel) -> None:
44
        if connection and connection.is_open:
45
            if channel and channel.is_open:
46
                # Declare the default exchange agent
47
                RMQConnector.exchange_declare(
48
                    channel=channel, exchange_name=DEFAULT_EXCHANGER_NAME, exchange_type=DEFAULT_EXCHANGER_TYPE,
49
                )
50
                # Declare the default queue
51
                RMQConnector.queue_declare(channel, queue_name=DEFAULT_QUEUE)
52
                # Bind the default queue to the default exchange
53
                RMQConnector.queue_bind(
54
                    channel=channel, queue_name=DEFAULT_QUEUE,
55
                    exchange_name=DEFAULT_EXCHANGER_NAME, routing_key=DEFAULT_ROUTER
56
                )
57
                return
58
            raise ConnectionError("The channel is missing or closed.")
59
        raise ConnectionError("The connection is missing or closed.")
60
61
    # Connection related methods
62
    @staticmethod
63
    def open_blocking_connection(
64
        credentials: PlainCredentials,
65
        host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST
66
    ) -> BlockingConnection:
67
        blocking_connection = BlockingConnection(
68
            parameters=ConnectionParameters(
69
                host=host,
70
                port=port,
71
                virtual_host=vhost,
72
                credentials=credentials,
73
                heartbeat=config.OCRD_NETWORK_RABBITMQ_HEARTBEAT
74
            ),
75
        )
76
        return blocking_connection
77
78
    @staticmethod
79
    def open_blocking_channel(connection: BlockingConnection) -> Union[BlockingChannel, None]:
80
        if connection and connection.is_open:
81
            channel = connection.channel()
82
            return channel
83
        raise ConnectionError("The connection is missing or closed.")
84
85
    def _authenticate_and_connect(self, username: str, password: str) -> None:
86
        # Delete credentials once connected
87
        credentials = PlainCredentials(username=username, password=password, erase_on_connect=False)
88
        self._connection = RMQConnector.open_blocking_connection(
89
            host=self._host, port=self._port, vhost=self._vhost, credentials=credentials,
90
        )
91
        self._channel = RMQConnector.open_blocking_channel(self._connection)
92
        if not self._connection:
93
            raise ConnectionError("The connection is missing or closed.")
94
        if not self._channel:
95
            raise ConnectionError("The channel is missing or closed.")
96
97
    @staticmethod
98
    def exchange_bind(
99
        channel: BlockingChannel, destination_exchange: str, source_exchange: str, routing_key: str,
100
        arguments: Optional[Any] = None
101
    ) -> None:
102
        if arguments is None:
103
            arguments = {}
104
        if channel and channel.is_open:
105
            channel.exchange_bind(
106
                destination=destination_exchange,
107
                source=source_exchange,
108
                routing_key=routing_key,
109
                arguments=arguments
110
            )
111
            return
112
        raise ConnectionError("The channel is missing or closed.")
113
114
    @staticmethod
115
    def exchange_declare(
116
        channel: BlockingChannel, exchange_name: str, exchange_type: str, passive: bool = False, durable: bool = False,
117
        auto_delete: bool = False, internal: bool = False, arguments: Optional[Any] = None
118
    ) -> None:
119
        if arguments is None:
120
            arguments = {}
121
        if channel and channel.is_open:
122
            channel.exchange_declare(
123
                exchange=exchange_name,
124
                exchange_type=exchange_type,
125
                # Only check to see if the exchange exists
126
                passive=passive,
127
                # Survive a reboot of RabbitMQ
128
                durable=durable,
129
                # Remove when no more queues are bound to it
130
                auto_delete=auto_delete,
131
                # Can only be published to by other exchanges
132
                internal=internal,
133
                # Custom key/value pair arguments for the exchange
134
                arguments=arguments
135
            )
136
            return
137
        raise ConnectionError("The channel is missing or closed.")
138
139
    @staticmethod
140
    def exchange_delete(channel: BlockingChannel, exchange_name: str, if_unused: bool = False) -> None:
141
        # Deletes queue only if unused
142
        if channel and channel.is_open:
143
            channel.exchange_delete(exchange=exchange_name, if_unused=if_unused)
144
            return
145
        raise ConnectionError("The channel is missing or closed.")
146
147
    @staticmethod
148
    def exchange_unbind(
149
        channel: BlockingChannel, destination_exchange: str, source_exchange: str, routing_key: str,
150
        arguments: Optional[Any] = None
151
    ) -> None:
152
        if arguments is None:
153
            arguments = {}
154
        if channel and channel.is_open:
155
            channel.exchange_unbind(
156
                destination=destination_exchange,
157
                source=source_exchange,
158
                routing_key=routing_key,
159
                arguments=arguments
160
            )
161
            return
162
        raise ConnectionError("The channel is missing or closed.")
163
164
    @staticmethod
165
    def queue_bind(
166
        channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, arguments: Optional[Any] = None
167
    ) -> None:
168
        if arguments is None:
169
            arguments = {}
170
        if channel and channel.is_open:
171
            channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key, arguments=arguments)
172
            return
173
        raise ConnectionError("The channel is missing or closed.")
174
175
    @staticmethod
176
    def queue_declare(
177
        channel: BlockingChannel, queue_name: str, passive: bool = False, durable: bool = False,
178
        exclusive: bool = False, auto_delete: bool = False, arguments: Optional[Any] = None
179
    ) -> None:
180
        if arguments is None:
181
            arguments = {}
182
        if channel and channel.is_open:
183
            queue = channel.queue_declare(
184
                queue=queue_name,
185
                # Only check to see if the queue exists and
186
                # raise ChannelClosed exception if it does not
187
                passive=passive,
188
                # Survive reboots of the server
189
                durable=durable,
190
                # Only allow access by the current connection
191
                exclusive=exclusive,
192
                # Delete after consumer cancels or disconnects
193
                auto_delete=auto_delete,
194
                # Custom key/value pair arguments for the queue
195
                arguments=arguments
196
            )
197
            return queue
198
        raise ConnectionError("The channel is missing or closed.")
199
200
    @staticmethod
201
    def queue_delete(
202
        channel: BlockingChannel, queue_name: str, if_unused: bool = False, if_empty: bool = False
203
    ) -> None:
204
        if channel and channel.is_open:
205
            channel.queue_delete(
206
                queue=queue_name,
207
                # Only delete if the queue is unused
208
                if_unused=if_unused,
209
                # Only delete if the queue is empty
210
                if_empty=if_empty
211
            )
212
            return
213
        raise ConnectionError("The channel is missing or closed.")
214
215
    @staticmethod
216
    def queue_purge(channel: BlockingChannel, queue_name: str) -> None:
217
        if channel and channel.is_open:
218
            channel.queue_purge(queue=queue_name)
219
220
    @staticmethod
221
    def queue_unbind(
222
        channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, arguments: Optional[Any] = None
223
    ) -> None:
224
        if arguments is None:
225
            arguments = {}
226
        if channel and channel.is_open:
227
            channel.queue_unbind(
228
                queue=queue_name,
229
                exchange=exchange_name,
230
                routing_key=routing_key,
231
                arguments=arguments
232
            )
233
            return
234
        raise ConnectionError("The channel is missing or closed.")
235
236
    def create_queue(
237
        self, queue_name: str, exchange_name: Optional[str] = DEFAULT_EXCHANGER_NAME,
238
        exchange_type: Optional[str] = "direct", passive: bool = False
239
    ) -> None:
240
        RMQConnector.exchange_declare(channel=self._channel, exchange_name=exchange_name, exchange_type=exchange_type)
241
        RMQConnector.queue_declare(channel=self._channel, queue_name=queue_name, passive=passive)
242
        # The queue name is used as a routing key, to keep implementation simple
243
        RMQConnector.queue_bind(
244
            channel=self._channel, queue_name=queue_name, exchange_name=exchange_name, routing_key=queue_name
245
        )
246
247
    @staticmethod
248
    def set_qos(
249
        channel: BlockingChannel, prefetch_size: int = 0, prefetch_count: int = PREFETCH_COUNT, global_qos: bool = False
250
    ) -> None:
251
        if channel and channel.is_open:
252
            channel.basic_qos(
253
                # No specific limit if set to 0
254
                prefetch_size=prefetch_size,
255
                prefetch_count=prefetch_count,
256
                # Should the qos apply to all channels of the connection
257
                global_qos=global_qos
258
            )
259
            return
260
        raise ConnectionError("The channel is missing or closed.")
261
262
    @staticmethod
263
    def confirm_delivery(channel: BlockingChannel) -> None:
264
        if channel and channel.is_open:
265
            channel.confirm_delivery()
266
            return
267
        raise ConnectionError("The channel is missing or closed.")
268
269
    @staticmethod
270
    def basic_publish(
271
        channel: BlockingChannel, exchange_name: str, routing_key: str, message_body: bytes, properties: BasicProperties
272
    ) -> None:
273
        if channel and channel.is_open:
274
            channel.basic_publish(
275
                exchange=exchange_name,
276
                routing_key=routing_key,
277
                body=message_body,
278
                properties=properties
279
            )
280
            return
281
        raise ConnectionError("The channel is missing or closed.")
282