Passed
Pull Request — master (#1191)
by
unknown
05:29
created

ocrd_network.rabbitmq_utils.helpers   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 112
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 82
dl 0
loc 112
rs 10
c 0
b 0
f 0

7 Functions

Rating   Name   Duplication   Size   Complexity  
A connect_rabbitmq_consumer() 0 4 1
A verify_and_parse_mq_uri() 0 18 2
A verify_rabbitmq_available() 0 4 1
B __connect_rabbitmq_client() 0 32 6
A connect_rabbitmq_publisher() 0 7 2
A create_message_queues() 0 12 3
A check_if_queue_exists() 0 10 2
1
from logging import Logger
2
from pika import URLParameters
3
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker
4
from re import match as re_match
5
from time import sleep
6
from typing import Dict, List, Union
7
8
from .constants import RECONNECT_TRIES, RECONNECT_WAIT
9
from .consumer import RMQConsumer
10
from .publisher import RMQPublisher
11
12
13
def __connect_rabbitmq_client(
14
    logger: Logger,
15
    client_type: str,
16
    rmq_data: Dict,
17
    attempts: int = RECONNECT_TRIES,
18
    delay: int = RECONNECT_WAIT
19
) -> Union[RMQConsumer, RMQPublisher]:
20
    try:
21
        rmq_host: str = rmq_data["host"]
22
        rmq_port: int = rmq_data["port"]
23
        rmq_vhost: str = rmq_data["vhost"]
24
        rmq_username: str = rmq_data["username"]
25
        rmq_password: str = rmq_data["password"]
26
    except ValueError as error:
27
        raise Exception("Failed to parse RabbitMQ connection data") from error
28
    logger.info(f"Connecting client to RabbitMQ server: {rmq_host}:{rmq_port}{rmq_vhost}")
29
    logger.debug(f"RabbitMQ client authenticates with username: {rmq_username}, password: {rmq_password}")
30
    while attempts > 0:
31
        try:
32
            if client_type == "consumer":
33
                rmq_client = RMQConsumer(host=rmq_host, port=rmq_port, vhost=rmq_vhost)
34
            elif client_type == "publisher":
35
                rmq_client = RMQPublisher(host=rmq_host, port=rmq_port, vhost=rmq_vhost)
36
            else:
37
                raise RuntimeError(f"RabbitMQ client type can be either a consumer or publisher. Got: {client_type}")
38
            rmq_client.authenticate_and_connect(username=rmq_username, password=rmq_password)
39
            return rmq_client
40
        except AMQPConnectionError:
41
            attempts -= 1
42
            sleep(delay)
43
            continue
44
    raise RuntimeError(f"Failed to establish connection with the RabbitMQ Server. Connection data: {rmq_data}")
45
46
47
def connect_rabbitmq_consumer(logger: Logger, rmq_data: Dict) -> RMQConsumer:
48
    rmq_consumer = __connect_rabbitmq_client(logger=logger, client_type="consumer", rmq_data=rmq_data)
49
    logger.info(f"Successfully connected RMQConsumer")
50
    return rmq_consumer
51
52
53
def connect_rabbitmq_publisher(logger: Logger, rmq_data: Dict, enable_acks: bool = True) -> RMQPublisher:
54
    rmq_publisher = __connect_rabbitmq_client(logger=logger, client_type="publisher", rmq_data=rmq_data)
55
    if enable_acks:
56
        rmq_publisher.enable_delivery_confirmations()
57
        logger.info("Delivery confirmations are enabled")
58
    logger.info("Successfully connected RMQPublisher")
59
    return rmq_publisher
60
61
62
def check_if_queue_exists(logger: Logger, rmq_data: Dict, processor_name: str) -> bool:
63
    rmq_publisher = connect_rabbitmq_publisher(logger, rmq_data)
64
    try:
65
        # Passively checks whether the queue name exists, if not raises ChannelClosedByBroker
66
        rmq_publisher.create_queue(processor_name, passive=True)
67
        return True
68
    except ChannelClosedByBroker as error:
69
        # The created connection was forcibly closed by the RabbitMQ Server
70
        logger.warning(f"Process queue with id '{processor_name}' not existing: {error}")
71
        return False
72
73
74
def create_message_queues(logger: Logger, rmq_publisher: RMQPublisher, queue_names: List[str]) -> None:
75
    # TODO: Reconsider and refactor this.
76
    #  Added ocrd-dummy by default if not available for the integration tests.
77
    #  A proper Processing Worker / Processor Server registration endpoint is needed on the Processing Server side
78
    if "ocrd-dummy" not in queue_names:
79
        queue_names.append("ocrd-dummy")
80
81
    for queue_name in queue_names:
82
        # The existence/validity of the worker.name is not tested.
83
        # Even if an ocr-d processor does not exist, the queue is created
84
        logger.info(f"Creating a message queue with id: {queue_name}")
85
        rmq_publisher.create_queue(queue_name=queue_name)
86
87
88
def verify_and_parse_mq_uri(rabbitmq_address: str):
89
    """
90
    Check the full list of available parameters in the docs here:
91
    https://pika.readthedocs.io/en/stable/_modules/pika/connection.html#URLParameters
92
    """
93
    uri_pattern = r"^(?:([^:\/?#\s]+):\/{2})?(?:([^@\/?#\s]+)@)?([^\/?#\s]+)?(?:\/([^?#\s]*))?(?:[?]([^#\s]+))?\S*$"
94
    match = re_match(pattern=uri_pattern, string=rabbitmq_address)
95
    if not match:
96
        raise ValueError(f"The message queue server address is in wrong format: '{rabbitmq_address}'")
97
    url_params = URLParameters(rabbitmq_address)
98
    parsed_data = {
99
        "username": url_params.credentials.username,
100
        "password": url_params.credentials.password,
101
        "host": url_params.host,
102
        "port": url_params.port,
103
        "vhost": url_params.virtual_host
104
    }
105
    return parsed_data
106
107
108
def verify_rabbitmq_available(logger: Logger, rabbitmq_address: str) -> None:
109
    rmq_data = verify_and_parse_mq_uri(rabbitmq_address=rabbitmq_address)
110
    temp_publisher = connect_rabbitmq_publisher(logger, rmq_data, enable_acks=True)
111
    temp_publisher.close_connection()
112