Passed
Pull Request — master (#1191)
by
unknown
03:12
created

ocrd_network.runtime_data.deployer   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 184
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 41
eloc 128
dl 0
loc 184
rs 9.1199
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Deployer.deploy_rabbitmq() 0 3 1
A Deployer.stop_network_agents() 0 4 2
A Deployer.stop_unix_mets_server() 0 21 3
A Deployer.deploy_mongodb() 0 3 1
A Deployer.resolve_processor_server_url() 0 7 4
A Deployer.deploy_network_agents() 0 4 2
A Deployer.__init__() 0 8 1
F Deployer.find_matching_processors() 0 63 22
A Deployer.stop_mongodb() 0 2 1
A Deployer.stop_rabbitmq() 0 2 1
A Deployer.start_unix_mets_server() 0 21 2
A Deployer.stop_all() 0 9 1

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.runtime_data.deployer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Abstraction of the deployment functionality for processors.
3
4
The Processing Server provides the configuration parameters to the Deployer agent.
5
The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts.
6
Each Processing Host may have several Processing Workers.
7
Each Processing Worker is an instance of an OCR-D processor.
8
"""
9
from __future__ import annotations
10
from pathlib import Path
11
from subprocess import Popen, run as subprocess_run
12
from time import sleep
13
from typing import Dict, List, Union
14
15
from ocrd_utils import config, getLogger, safe_filename
16
from ..constants import DeployType
17
from ..logging_utils import get_mets_server_logging_file_path
18
from ..utils import is_mets_server_running, stop_mets_server
19
from .config_parser import parse_hosts_data, parse_mongodb_data, parse_rabbitmq_data, validate_and_load_config
20
from .hosts import DataHost
21
from .network_services import DataMongoDB, DataRabbitMQ
22
23
24
class Deployer:
25
    def __init__(self, config_path: str) -> None:
26
        self.log = getLogger("ocrd_network.deployer")
27
        ps_config = validate_and_load_config(config_path)
28
        self.data_mongo: DataMongoDB = parse_mongodb_data(ps_config["database"])
29
        self.data_queue: DataRabbitMQ = parse_rabbitmq_data(ps_config["process_queue"])
30
        self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"])
31
        self.internal_callback_url = ps_config.get("internal_callback_url", None)
32
        self.mets_servers: Dict = {}  # {"mets_server_url": "mets_server_pid"}
33
34
    # TODO: Reconsider this.
35
    def find_matching_processors(
36
            self,
37
            worker_only: bool = False,
38
            server_only: bool = False,
39
            docker_only: bool = False,
40
            native_only: bool = False,
41
            str_names_only: bool = False,
42
            unique_only: bool = False
43
    ) -> Union[List[str], List[object]]:
44
        """Finds and returns a list of matching data objects of type:
45
        `DataProcessingWorker` and `DataProcessorServer`.
46
47
        :py:attr:`worker_only` match only processors with worker status
48
        :py:attr:`server_only` match only processors with server status
49
        :py:attr:`docker_only` match only docker processors
50
        :py:attr:`native_only` match only native processors
51
        :py:attr:`str_only` returns the processor_name instead of data object
52
        :py:attr:`unique_only` remove duplicates from the matches
53
54
        `worker_only` and `server_only` are mutually exclusive to each other
55
        `docker_only` and `native_only` are mutually exclusive to each other
56
        `unique_only` is allowed only together with `str_names_only`
57
        """
58
59
        if worker_only and server_only:
60
            msg = f"Only 'worker_only' or 'server_only' is allowed, not both."
61
            self.log.exception(msg)
62
            raise ValueError(msg)
63
        if docker_only and native_only:
64
            msg = f"Only 'docker_only' or 'native_only' is allowed, not both."
65
            self.log.exception(msg)
66
            raise ValueError(msg)
67
        if not str_names_only and unique_only:
68
            msg = f"Value 'unique_only' is allowed only together with 'str_names_only'"
69
            self.log.exception(msg)
70
            raise ValueError(msg)
71
72
        # Find all matching objects of type:
73
        # DataProcessingWorker or DataProcessorServer
74
        matched_objects = []
75
        for data_host in self.data_hosts:
76
            if not server_only:
77
                for data_worker in data_host.network_agents_worker:
78
                    if data_worker.deploy_type == DeployType.NATIVE and docker_only:
79
                        continue
80
                    if data_worker.deploy_type == DeployType.DOCKER and native_only:
81
                        continue
82
                    matched_objects.append(data_worker)
83
            if not worker_only:
84
                for data_server in data_host.network_agents_server:
85
                    if data_server.deploy_type == DeployType.NATIVE and docker_only:
86
                        continue
87
                    if data_server.deploy_type == DeployType.DOCKER and native_only:
88
                        continue
89
                    matched_objects.append(data_server)
90
        if str_names_only:
91
            # gets only the processor names of the matched objects
92
            name_list = [match.processor_name for match in matched_objects]
93
            if unique_only:
94
                # removes the duplicates, if any
95
                return list(dict.fromkeys(name_list))
96
            return name_list
97
        return matched_objects
98
99
    def resolve_processor_server_url(self, processor_name) -> str:
100
        processor_server_url = ''
101
        for data_host in self.data_hosts:
102
            for data_server in data_host.network_agents_server:
103
                if data_server.processor_name == processor_name:
104
                    processor_server_url = f"http://{data_host.host}:{data_server.port}/"
105
        return processor_server_url
106
107
    def deploy_network_agents(self, mongodb_url: str, rabbitmq_url: str) -> None:
108
        self.log.debug("Deploying processing workers/processor servers...")
109
        for host_data in self.data_hosts:
110
            host_data.deploy_network_agents(logger=self.log, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url)
111
112
    def stop_network_agents(self) -> None:
113
        self.log.debug("Stopping processing workers/processor servers...")
114
        for host_data in self.data_hosts:
115
            host_data.stop_network_agents(logger=self.log)
116
117
    def deploy_rabbitmq(self) -> str:
118
        self.data_queue.deploy_rabbitmq(self.log)
119
        return self.data_queue.service_url
120
121
    def stop_rabbitmq(self):
122
        self.data_queue.stop_service_rabbitmq(self.log)
123
124
    def deploy_mongodb(self) -> str:
125
        self.data_mongo.deploy_mongodb(self.log)
126
        return self.data_mongo.service_url
127
128
    def stop_mongodb(self):
129
        self.data_mongo.stop_service_mongodb(self.log)
130
131
    def stop_all(self) -> None:
132
        """
133
        The order of stopping is important to optimize graceful shutdown in the future.
134
        If RabbitMQ server is stopped before stopping Processing Workers that may have
135
        a bad outcome and leave Processing Workers in an unpredictable state.
136
        """
137
        self.stop_network_agents()
138
        self.stop_mongodb()
139
        self.stop_rabbitmq()
140
141
    def start_unix_mets_server(self, mets_path: str) -> Path:
142
        log_file = get_mets_server_logging_file_path(mets_path=mets_path)
143
        mets_server_url = Path(config.OCRD_NETWORK_SOCKETS_ROOT_DIR, f"{safe_filename(mets_path)}.sock")
144
        if is_mets_server_running(mets_server_url=str(mets_server_url)):
145
            self.log.warning(f"The mets server for {mets_path} is already started: {mets_server_url}")
146
            return mets_server_url
147
        cwd = Path(mets_path).parent
148
        self.log.info(f"Starting UDS mets server: {mets_server_url}")
149
        sub_process = Popen(
150
            args=["nohup", "ocrd", "workspace", "--mets-server-url", f"{mets_server_url}",
151
                  "-d", f"{cwd}", "server", "start"],
152
            shell=False,
153
            stdout=open(file=log_file, mode="w"),
154
            stderr=open(file=log_file, mode="a"),
155
            cwd=cwd,
156
            universal_newlines=True
157
        )
158
        # Wait for the mets server to start
159
        sleep(2)
160
        self.mets_servers[mets_server_url] = sub_process.pid
161
        return mets_server_url
162
163
    def stop_unix_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None:
164
        self.log.info(f"Stopping UDS mets server: {mets_server_url}")
165
        if stop_with_pid:
166
            if Path(mets_server_url) not in self.mets_servers:
167
                message = f"Mets server not found at URL: {mets_server_url}"
168
                self.log.exception(message)
169
                raise Exception(message)
170
            mets_server_pid = self.mets_servers[Path(mets_server_url)]
171
            subprocess_run(
172
                args=["kill", "-s", "SIGINT", f"{mets_server_pid}"],
173
                shell=False,
174
                universal_newlines=True
175
            )
176
            return
177
        # TODO: Reconsider this again
178
        #  Not having this sleep here causes connection errors
179
        #  on the last request processed by the processing worker.
180
        #  Sometimes 3 seconds is enough, sometimes not.
181
        sleep(5)
182
        stop_mets_server(mets_server_url=mets_server_url)
183
        return
184