Passed
Pull Request — master (#1191)
by
unknown
02:56
created

ocrd_network.rabbitmq_utils.helpers   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 108
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 78
dl 0
loc 108
rs 10
c 0
b 0
f 0

7 Functions

Rating   Name   Duplication   Size   Complexity  
A connect_rabbitmq_consumer() 0 4 1
A verify_and_parse_mq_uri() 0 18 2
A verify_rabbitmq_available() 0 4 1
B __connect_rabbitmq_client() 0 28 6
A connect_rabbitmq_publisher() 0 7 2
A create_message_queues() 0 12 3
A check_if_queue_exists() 0 10 2
1
from logging import Logger
2
from pika import URLParameters
3
from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker
4
from re import match as re_match
5
from time import sleep
6
from typing import Dict, List, Union
7
8
from .constants import RECONNECT_TRIES, RECONNECT_WAIT
9
from .consumer import RMQConsumer
10
from .publisher import RMQPublisher
11
12
13
def __connect_rabbitmq_client(
14
    logger: Logger, client_type: str, rmq_data: Dict, attempts: int = RECONNECT_TRIES, delay: int = RECONNECT_WAIT
15
) -> Union[RMQConsumer, RMQPublisher]:
16
    try:
17
        rmq_host: str = rmq_data["host"]
18
        rmq_port: int = rmq_data["port"]
19
        rmq_vhost: str = rmq_data["vhost"]
20
        rmq_username: str = rmq_data["username"]
21
        rmq_password: str = rmq_data["password"]
22
    except ValueError as error:
23
        raise Exception("Failed to parse RabbitMQ connection data") from error
24
    logger.info(f"Connecting client to RabbitMQ server: {rmq_host}:{rmq_port}{rmq_vhost}")
25
    logger.debug(f"RabbitMQ client authenticates with username: {rmq_username}, password: {rmq_password}")
26
    while attempts > 0:
27
        try:
28
            if client_type == "consumer":
29
                rmq_client = RMQConsumer(host=rmq_host, port=rmq_port, vhost=rmq_vhost)
30
            elif client_type == "publisher":
31
                rmq_client = RMQPublisher(host=rmq_host, port=rmq_port, vhost=rmq_vhost)
32
            else:
33
                raise RuntimeError(f"RabbitMQ client type can be either a consumer or publisher. Got: {client_type}")
34
            rmq_client.authenticate_and_connect(username=rmq_username, password=rmq_password)
35
            return rmq_client
36
        except AMQPConnectionError:
37
            attempts -= 1
38
            sleep(delay)
39
            continue
40
    raise RuntimeError(f"Failed to establish connection with the RabbitMQ Server. Connection data: {rmq_data}")
41
42
43
def connect_rabbitmq_consumer(logger: Logger, rmq_data: Dict) -> RMQConsumer:
44
    rmq_consumer = __connect_rabbitmq_client(logger=logger, client_type="consumer", rmq_data=rmq_data)
45
    logger.info(f"Successfully connected RMQConsumer")
46
    return rmq_consumer
47
48
49
def connect_rabbitmq_publisher(logger: Logger, rmq_data: Dict, enable_acks: bool = True) -> RMQPublisher:
50
    rmq_publisher = __connect_rabbitmq_client(logger=logger, client_type="publisher", rmq_data=rmq_data)
51
    if enable_acks:
52
        rmq_publisher.enable_delivery_confirmations()
53
        logger.info("Delivery confirmations are enabled")
54
    logger.info("Successfully connected RMQPublisher")
55
    return rmq_publisher
56
57
58
def check_if_queue_exists(logger: Logger, rmq_data: Dict, processor_name: str) -> bool:
59
    rmq_publisher = connect_rabbitmq_publisher(logger, rmq_data)
60
    try:
61
        # Passively checks whether the queue name exists, if not raises ChannelClosedByBroker
62
        rmq_publisher.create_queue(processor_name, passive=True)
63
        return True
64
    except ChannelClosedByBroker as error:
65
        # The created connection was forcibly closed by the RabbitMQ Server
66
        logger.warning(f"Process queue with id '{processor_name}' not existing: {error}")
67
        return False
68
69
70
def create_message_queues(logger: Logger, rmq_publisher: RMQPublisher, queue_names: List[str]) -> None:
71
    # TODO: Reconsider and refactor this.
72
    #  Added ocrd-dummy by default if not available for the integration tests.
73
    #  A proper Processing Worker / Processor Server registration endpoint is needed on the Processing Server side
74
    if "ocrd-dummy" not in queue_names:
75
        queue_names.append("ocrd-dummy")
76
77
    for queue_name in queue_names:
78
        # The existence/validity of the worker.name is not tested.
79
        # Even if an ocr-d processor does not exist, the queue is created
80
        logger.info(f"Creating a message queue with id: {queue_name}")
81
        rmq_publisher.create_queue(queue_name=queue_name)
82
83
84
def verify_and_parse_mq_uri(rabbitmq_address: str):
85
    """
86
    Check the full list of available parameters in the docs here:
87
    https://pika.readthedocs.io/en/stable/_modules/pika/connection.html#URLParameters
88
    """
89
    uri_pattern = r"^(?:([^:\/?#\s]+):\/{2})?(?:([^@\/?#\s]+)@)?([^\/?#\s]+)?(?:\/([^?#\s]*))?(?:[?]([^#\s]+))?\S*$"
90
    match = re_match(pattern=uri_pattern, string=rabbitmq_address)
91
    if not match:
92
        raise ValueError(f"The message queue server address is in wrong format: '{rabbitmq_address}'")
93
    url_params = URLParameters(rabbitmq_address)
94
    parsed_data = {
95
        "username": url_params.credentials.username,
96
        "password": url_params.credentials.password,
97
        "host": url_params.host,
98
        "port": url_params.port,
99
        "vhost": url_params.virtual_host
100
    }
101
    return parsed_data
102
103
104
def verify_rabbitmq_available(logger: Logger, rabbitmq_address: str) -> None:
105
    rmq_data = verify_and_parse_mq_uri(rabbitmq_address=rabbitmq_address)
106
    temp_publisher = connect_rabbitmq_publisher(logger, rmq_data, enable_acks=True)
107
    temp_publisher.close_connection()
108