Passed
Pull Request — master (#1191)
by
unknown
03:08
created

ocrd_network.runtime_data.deployer   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 175
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 37
eloc 120
dl 0
loc 175
rs 9.44
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Deployer.deploy_rabbitmq() 0 3 1
A Deployer.stop_network_agents() 0 4 2
F Deployer.find_matching_network_agents() 0 57 20
A Deployer.stop_unix_mets_server() 0 21 3
A Deployer.deploy_mongodb() 0 3 1
A Deployer.resolve_processor_server_url() 0 5 2
A Deployer.deploy_network_agents() 0 4 2
A Deployer.__init__() 0 8 1
A Deployer.stop_mongodb() 0 2 1
A Deployer.stop_rabbitmq() 0 2 1
A Deployer.start_unix_mets_server() 0 21 2
A Deployer.stop_all() 0 9 1
1
"""
2
Abstraction of the deployment functionality for processors.
3
4
The Processing Server provides the configuration parameters to the Deployer agent.
5
The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts.
6
Each Processing Host may have several Processing Workers.
7
Each Processing Worker is an instance of an OCR-D processor.
8
"""
9
from __future__ import annotations
10
from pathlib import Path
11
from subprocess import Popen, run as subprocess_run
12
from time import sleep
13
from typing import Dict, List, Union
14
15
from ocrd_utils import config, getLogger, safe_filename
16
from ..logging_utils import get_mets_server_logging_file_path
17
from ..utils import is_mets_server_running, stop_mets_server
18
from .config_parser import parse_hosts_data, parse_mongodb_data, parse_rabbitmq_data, validate_and_load_config
19
from .hosts import DataHost
20
from .network_services import DataMongoDB, DataRabbitMQ
21
22
23
class Deployer:
24
    def __init__(self, config_path: str) -> None:
25
        self.log = getLogger("ocrd_network.deployer")
26
        ps_config = validate_and_load_config(config_path)
27
        self.data_mongo: DataMongoDB = parse_mongodb_data(ps_config["database"])
28
        self.data_queue: DataRabbitMQ = parse_rabbitmq_data(ps_config["process_queue"])
29
        self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"])
30
        self.internal_callback_url = ps_config.get("internal_callback_url", None)
31
        self.mets_servers: Dict = {}  # {"mets_server_url": "mets_server_pid"}
32
33
    # TODO: Reconsider this.
34
    def find_matching_network_agents(
35
        self, worker_only: bool = False, server_only: bool = False, docker_only: bool = False,
36
        native_only: bool = False, str_names_only: bool = False, unique_only: bool = False
37
    ) -> Union[List[str], List[object]]:
38
        """Finds and returns a list of matching data objects of type:
39
        `DataProcessingWorker` and `DataProcessorServer`.
40
41
        :py:attr:`worker_only` match only worker network agents (DataProcessingWorker)
42
        :py:attr:`server_only` match only server network agents (DataProcessorServer)
43
        :py:attr:`docker_only` match only docker network agents (DataProcessingWorker and DataProcessorServer)
44
        :py:attr:`native_only` match only native network agents (DataProcessingWorker and DataProcessorServer)
45
        :py:attr:`str_names_only` returns the processor_name filed instead of the Data* object
46
        :py:attr:`unique_only` remove duplicate names from the matches
47
48
        `worker_only` and `server_only` are mutually exclusive to each other
49
        `docker_only` and `native_only` are mutually exclusive to each other
50
        `unique_only` is allowed only together with `str_names_only`
51
        """
52
53
        if worker_only and server_only:
54
            msg = f"Only 'worker_only' or 'server_only' is allowed, not both."
55
            self.log.exception(msg)
56
            raise ValueError(msg)
57
        if docker_only and native_only:
58
            msg = f"Only 'docker_only' or 'native_only' is allowed, not both."
59
            self.log.exception(msg)
60
            raise ValueError(msg)
61
        if not str_names_only and unique_only:
62
            msg = f"Value 'unique_only' is allowed only together with 'str_names_only'"
63
            self.log.exception(msg)
64
            raise ValueError(msg)
65
66
        # Find all matching objects of type DataProcessingWorker or DataProcessorServer
67
        matched_objects = []
68
        for data_host in self.data_hosts:
69
            if not server_only:
70
                if not docker_only:
71
                    for data_worker in data_host.network_agents_worker_native:
72
                        matched_objects.append(data_worker)
73
                if not native_only:
74
                    for data_worker in data_host.network_agents_worker_docker:
75
                        matched_objects.append(data_worker)
76
            if not worker_only:
77
                if not docker_only:
78
                    for data_server in data_host.network_agents_server_native:
79
                        matched_objects.append(data_server)
80
                if not native_only:
81
                    for data_server in data_host.network_agents_server_docker:
82
                        matched_objects.append(data_server)
83
        if not str_names_only:
84
            return matched_objects
85
        # Gets only the processor names of the matched objects
86
        matched_names = [match.processor_name for match in matched_objects]
87
        if not unique_only:
88
            return matched_names
89
        # Removes any duplicate entries from matched names
90
        return list(dict.fromkeys(matched_names))
91
92
    def resolve_processor_server_url(self, processor_name) -> str:
93
        processor_server_url = ''
94
        for data_host in self.data_hosts:
95
            processor_server_url = data_host.resolve_processor_server_url(processor_name=processor_name)
96
        return processor_server_url
97
98
    def deploy_network_agents(self, mongodb_url: str, rabbitmq_url: str) -> None:
99
        self.log.debug("Deploying processing workers/processor servers...")
100
        for host_data in self.data_hosts:
101
            host_data.deploy_network_agents(logger=self.log, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url)
102
103
    def stop_network_agents(self) -> None:
104
        self.log.debug("Stopping processing workers/processor servers...")
105
        for host_data in self.data_hosts:
106
            host_data.stop_network_agents(logger=self.log)
107
108
    def deploy_rabbitmq(self) -> str:
109
        self.data_queue.deploy_rabbitmq(self.log)
110
        return self.data_queue.service_url
111
112
    def stop_rabbitmq(self):
113
        self.data_queue.stop_service_rabbitmq(self.log)
114
115
    def deploy_mongodb(self) -> str:
116
        self.data_mongo.deploy_mongodb(self.log)
117
        return self.data_mongo.service_url
118
119
    def stop_mongodb(self):
120
        self.data_mongo.stop_service_mongodb(self.log)
121
122
    def stop_all(self) -> None:
123
        """
124
        The order of stopping is important to optimize graceful shutdown in the future.
125
        If RabbitMQ server is stopped before stopping Processing Workers that may have
126
        a bad outcome and leave Processing Workers in an unpredictable state.
127
        """
128
        self.stop_network_agents()
129
        self.stop_mongodb()
130
        self.stop_rabbitmq()
131
132
    def start_unix_mets_server(self, mets_path: str) -> Path:
133
        log_file = get_mets_server_logging_file_path(mets_path=mets_path)
134
        mets_server_url = Path(config.OCRD_NETWORK_SOCKETS_ROOT_DIR, f"{safe_filename(mets_path)}.sock")
135
        if is_mets_server_running(mets_server_url=str(mets_server_url)):
136
            self.log.warning(f"The mets server for {mets_path} is already started: {mets_server_url}")
137
            return mets_server_url
138
        cwd = Path(mets_path).parent
139
        self.log.info(f"Starting UDS mets server: {mets_server_url}")
140
        sub_process = Popen(
141
            args=["nohup", "ocrd", "workspace", "--mets-server-url", f"{mets_server_url}",
142
                  "-d", f"{cwd}", "server", "start"],
143
            shell=False,
144
            stdout=open(file=log_file, mode="w"),
145
            stderr=open(file=log_file, mode="a"),
146
            cwd=cwd,
147
            universal_newlines=True
148
        )
149
        # Wait for the mets server to start
150
        sleep(2)
151
        self.mets_servers[mets_server_url] = sub_process.pid
152
        return mets_server_url
153
154
    def stop_unix_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None:
155
        self.log.info(f"Stopping UDS mets server: {mets_server_url}")
156
        if stop_with_pid:
157
            if Path(mets_server_url) not in self.mets_servers:
158
                message = f"Mets server not found at URL: {mets_server_url}"
159
                self.log.exception(message)
160
                raise Exception(message)
161
            mets_server_pid = self.mets_servers[Path(mets_server_url)]
162
            subprocess_run(
163
                args=["kill", "-s", "SIGINT", f"{mets_server_pid}"],
164
                shell=False,
165
                universal_newlines=True
166
            )
167
            return
168
        # TODO: Reconsider this again
169
        #  Not having this sleep here causes connection errors
170
        #  on the last request processed by the processing worker.
171
        #  Sometimes 3 seconds is enough, sometimes not.
172
        sleep(5)
173
        stop_mets_server(mets_server_url=mets_server_url)
174
        return
175