Passed
Pull Request — master (#1319)
by Konstantin
02:15
created

ocrd_network.runtime_data.hosts   F

Complexity

Total Complexity 64

Size/Duplication

Total Lines 217
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 64
eloc 183
dl 0
loc 217
rs 3.28
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
B DataHost.deploy_network_agents() 0 16 6
A DataHost.__deploy_network_agents_processing_workers() 0 11 4
A DataHost.__parse_network_agents() 0 14 4
A DataHost.__stop_network_agent_resource_manager_server() 0 5 1
A DataHost.__deploy_network_agent_resource_manager_server() 0 6 1
A DataHost.__stop_network_agents_servers() 0 11 4
A DataHost.__deploy_network_agents_processor_servers() 0 13 4
C DataHost.__append_network_agent_to_lists() 0 18 11
A DataHost.__init__() 0 35 3
A DataHost.resolve_processor_server_url() 0 9 5
A DataHost.__stop_network_agents_workers() 0 11 4
A DataHost.create_connection_client() 0 9 4
A DataHost.__stop_network_agent() 0 13 4
A DataHost.__add_deployed_agent_server_port_to_cache() 0 5 2
B DataHost.stop_network_agents() 0 16 7

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.runtime_data.hosts often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from logging import Logger
2
from typing import Dict, List, Union
3
4
from ..constants import RESOURCE_MANAGER_SERVER_PORT
5
from .connection_clients import create_docker_client, create_ssh_client
6
from .network_agents import (
7
    AgentType, DataProcessingWorker, DataProcessorServer, DeployType, deploy_agent_native_get_pid_hack)
8
9
10
class DataHost:
11
    def __init__(
12
        self, host: str, username: str, password: str, keypath: str, workers: List[Dict], servers: List[Dict]
13
    ) -> None:
14
        self.host = host
15
        self.resource_manager_port = RESOURCE_MANAGER_SERVER_PORT
16
        self.resource_manager_pid = None
17
        self.username = username
18
        self.password = password
19
        self.keypath = keypath
20
21
        # These flags are used to track whether a connection of the specified
22
        # type should be created based on the received config file
23
        self.needs_ssh_connector: bool = False
24
        self.needs_docker_connector: bool = False
25
26
        # Connection clients, ssh for native deployment, docker for docker deployment
27
        self.ssh_client = None
28
        self.docker_client = None
29
30
        # Lists of network agents based on their agent and deployment type
31
        self.network_agents_worker_native: List[DataProcessingWorker] = []
32
        self.network_agents_worker_docker: List[DataProcessingWorker] = []
33
        self.network_agents_server_native: List[DataProcessorServer] = []
34
        self.network_agents_server_docker: List[DataProcessorServer] = []
35
36
        if not workers:
37
            workers = []
38
        if not servers:
39
            servers = []
40
41
        self.__parse_network_agents(processing_workers=workers, processor_servers=servers)
42
43
        # Used for caching deployed Processor Servers' ports on the current host
44
        # Key: processor_name, Value: list of ports
45
        self.processor_servers_ports: dict = {}
46
47
    def __add_deployed_agent_server_port_to_cache(self, processor_name: str, port: int) -> None:
48
        if processor_name not in self.processor_servers_ports:
49
            self.processor_servers_ports[processor_name] = [port]
50
            return
51
        self.processor_servers_ports[processor_name] = self.processor_servers_ports[processor_name].append(port)
52
53
    def __append_network_agent_to_lists(self, agent_data: Union[DataProcessingWorker, DataProcessorServer]) -> None:
54
        if agent_data.deploy_type != DeployType.DOCKER and agent_data.deploy_type != DeployType.NATIVE:
55
            raise ValueError(f"Network agent deploy type is unknown: {agent_data.deploy_type}")
56
        if agent_data.agent_type != AgentType.PROCESSING_WORKER and agent_data.agent_type != AgentType.PROCESSOR_SERVER:
57
            raise ValueError(f"Network agent type is unknown: {agent_data.agent_type}")
58
59
        if agent_data.deploy_type == DeployType.NATIVE:
60
            self.needs_ssh_connector = True
61
            if agent_data.agent_type == AgentType.PROCESSING_WORKER:
62
                self.network_agents_worker_native.append(agent_data)
63
            elif agent_data.agent_type == AgentType.PROCESSOR_SERVER:
64
                self.network_agents_server_native.append(agent_data)
65
        elif agent_data.deploy_type == DeployType.DOCKER:
66
            self.needs_docker_connector = True
67
            if agent_data.agent_type == AgentType.PROCESSING_WORKER:
68
                self.network_agents_worker_docker.append(agent_data)
69
            elif agent_data.agent_type == AgentType.PROCESSOR_SERVER:
70
                self.network_agents_server_docker.append(agent_data)
71
72
    def __parse_network_agents(self, processing_workers: List[Dict], processor_servers: List[Dict]):
73
        for worker in processing_workers:
74
            worker_data = DataProcessingWorker(
75
                processor_name=worker["name"], deploy_type=worker["deploy_type"], host=self.host,
76
                init_by_config=True, pid=None
77
            )
78
            for _ in range(int(worker["number_of_instance"])):
79
                self.__append_network_agent_to_lists(agent_data=worker_data)
80
        for server in processor_servers:
81
            server_data = DataProcessorServer(
82
                processor_name=server["name"], deploy_type=server["deploy_type"], host=self.host,
83
                port=int(server["port"]), init_by_config=True, pid=None
84
            )
85
            self.__append_network_agent_to_lists(agent_data=server_data)
86
87
    def create_connection_client(self, client_type: str):
88
        if client_type not in ["docker", "ssh"]:
89
            raise ValueError(f"Host client type cannot be of type: {client_type}")
90
        if client_type == "ssh":
91
            self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath)
92
            return self.ssh_client
93
        if client_type == "docker":
94
            self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath)
95
            return self.docker_client
96
97
    def __deploy_network_agent_resource_manager_server(self, logger: Logger):
98
        logger.info(f"Deploying resource manager server on host: {self.host}:{self.resource_manager_port}")
99
        start_cmd = f"ocrd network resmgr-server --address {self.host}:{self.resource_manager_port} &"
100
        pid = deploy_agent_native_get_pid_hack(logger, self.ssh_client, start_cmd)
101
        logger.info(f"Deployed: OCR-D Resource Manager Server [{pid}]: {self.host}:{self.resource_manager_port}")
102
        self.resource_manager_pid = pid
103
104
    def __deploy_network_agents_processing_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str):
105
        logger.info(f"Deploying processing workers on host: {self.host}")
106
        amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker)
107
        if not amount_workers:
108
            logger.info("No processing workers found to be deployed")
109
        for data_worker in self.network_agents_worker_native:
110
            data_worker.deploy_network_agent(logger, self.ssh_client, mongodb_url, rabbitmq_url)
111
            logger.info(f"Deployed: {data_worker}")
112
        for data_worker in self.network_agents_worker_docker:
113
            data_worker.deploy_network_agent(logger, self.docker_client, mongodb_url, rabbitmq_url)
114
            logger.info(f"Deployed: {data_worker}")
115
116
    def __deploy_network_agents_processor_servers(self, logger: Logger, mongodb_url: str):
117
        logger.info(f"Deploying processor servers on host: {self.host}")
118
        amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker)
119
        if not amount_servers:
120
            logger.info("No processor servers found to be deployed")
121
        for data_server in self.network_agents_server_native:
122
            data_server.deploy_network_agent(logger, self.ssh_client, mongodb_url)
123
            self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port)
124
            logger.info(f"Deployed: {data_server}")
125
        for data_server in self.network_agents_server_docker:
126
            data_server.deploy_network_agent(logger, self.docker_client, mongodb_url)
127
            self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port)
128
            logger.info(f"Deployed: {data_server}")
129
130
    def deploy_network_agents(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None:
131
        if self.needs_ssh_connector and not self.ssh_client:
132
            logger.debug("Creating missing ssh connector before deploying")
133
            self.ssh_client = self.create_connection_client(client_type="ssh")
134
        if self.needs_docker_connector:
135
            logger.debug("Creating missing docker connector before deploying")
136
            self.docker_client = self.create_connection_client(client_type="docker")
137
        self.__deploy_network_agent_resource_manager_server(logger)
138
        self.__deploy_network_agents_processing_workers(logger, mongodb_url, rabbitmq_url)
139
        self.__deploy_network_agents_processor_servers(logger, mongodb_url)
140
        if self.ssh_client:
141
            self.ssh_client.close()
142
            self.ssh_client = None
143
        if self.docker_client:
144
            self.docker_client.close()
145
            self.docker_client = None
146
147
    def __stop_network_agent_resource_manager_server(self, logger: Logger):
148
        logger.info(f"Stopping OCR-D Resource Manager Server [{self.resource_manager_pid}]: "
149
                    f"{self.host}:{self.resource_manager_port}")
150
        assert self.ssh_client, f"SSH client connection missing"
151
        self.ssh_client.exec_command(f"kill {self.resource_manager_pid}")
152
153
    def __stop_network_agent(self, logger: Logger, name: str, deploy_type: DeployType, agent_type: AgentType, pid: str):
154
        agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}"
155
        if not pid:
156
            logger.warning(f"No pid was passed for {agent_info}")
157
            return
158
        agent_info += f", pid: {pid}"
159
        logger.info(f"Stopping {agent_info}")
160
        if deploy_type == DeployType.NATIVE:
161
            assert self.ssh_client, "SSH client connection missing"
162
            self.ssh_client.exec_command(f"kill {pid}")
163
        if deploy_type == DeployType.DOCKER:
164
            assert self.docker_client, "Docker client connection missing"
165
            self.docker_client.containers.get(pid).stop()
166
167
    def __stop_network_agents_workers(self, logger: Logger):
168
        logger.info(f"Stopping processing workers on host: {self.host}")
169
        amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker)
170
        if not amount_workers:
171
            logger.warning("No active processing workers to be stopped.")
172
        for worker in self.network_agents_worker_native:
173
            self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid)
174
        self.network_agents_worker_native = []
175
        for worker in self.network_agents_worker_docker:
176
            self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid)
177
        self.network_agents_worker_docker = []
178
179
    def __stop_network_agents_servers(self, logger: Logger):
180
        logger.info(f"Stopping processor servers on host: {self.host}")
181
        amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker)
182
        if not amount_servers:
183
            logger.warning("No active processor servers to be stopped.")
184
        for server in self.network_agents_server_native:
185
            self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid)
186
        self.network_agents_server_native = []
187
        for server in self.network_agents_server_docker:
188
            self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid)
189
        self.network_agents_server_docker = []
190
191
    def stop_network_agents(self, logger: Logger):
192
        if self.needs_ssh_connector and not self.ssh_client:
193
            logger.debug("Creating missing ssh connector before stopping")
194
            self.ssh_client = self.create_connection_client(client_type="ssh")
195
        if self.needs_docker_connector and not self.docker_client:
196
            logger.debug("Creating missing docker connector before stopping")
197
            self.docker_client = self.create_connection_client(client_type="docker")
198
        self.__stop_network_agent_resource_manager_server(logger=logger)
199
        self.__stop_network_agents_workers(logger=logger)
200
        self.__stop_network_agents_servers(logger=logger)
201
        if self.ssh_client:
202
            self.ssh_client.close()
203
            self.ssh_client = None
204
        if self.docker_client:
205
            self.docker_client.close()
206
            self.docker_client = None
207
208
    def resolve_processor_server_url(self, processor_name: str) -> str:
209
        processor_server_url = ''
210
        for data_server in self.network_agents_server_docker:
211
            if data_server.processor_name == processor_name:
212
                processor_server_url = f"http://{self.host}:{data_server.port}/"
213
        for data_server in self.network_agents_server_native:
214
            if data_server.processor_name == processor_name:
215
                processor_server_url = f"http://{self.host}:{data_server.port}/"
216
        return processor_server_url
217