ocrd_network.runtime_data.hosts   F
last analyzed

Complexity

Total Complexity 68

Size/Duplication

Total Lines 226
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 68
eloc 187
dl 0
loc 226
rs 2.96
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
B DataHost.deploy_network_agents() 0 15 6
A DataHost.__parse_network_agents_workers() 0 8 3
A DataHost.__deploy_network_agents_workers() 0 9 4
A DataHost.__parse_network_agents_servers() 0 7 2
A DataHost.__deploy_network_agents_servers() 0 11 4
B DataHost.__deploy_network_agent() 0 24 5
A DataHost.__stop_network_agents_servers() 0 11 4
C DataHost.__append_network_agent_to_lists() 0 18 11
A DataHost.__init__() 0 37 3
A DataHost.__stop_network_agents_workers() 0 11 4
A DataHost.resolve_processor_server_url() 0 9 5
A DataHost.create_connection_client() 0 9 4
A DataHost.__stop_network_agent() 0 13 4
A DataHost.__add_deployed_agent_server_port_to_cache() 0 5 2
B DataHost.stop_network_agents() 0 15 7

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.runtime_data.hosts often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from logging import Logger
2
from time import sleep
3
from typing import Dict, List, Union
4
5
from .connection_clients import create_docker_client, create_ssh_client
6
from .network_agents import AgentType, DataNetworkAgent, DataProcessingWorker, DataProcessorServer, DeployType
7
8
9
class DataHost:
10
    def __init__(
11
        self, host: str, username: str, password: str, keypath: str, workers: List[Dict], servers: List[Dict]
12
    ) -> None:
13
        self.host = host
14
        self.username = username
15
        self.password = password
16
        self.keypath = keypath
17
18
        # These flags are used to track whether a connection of the specified
19
        # type should be created based on the received config file
20
        self.needs_ssh_connector: bool = False
21
        self.needs_docker_connector: bool = False
22
23
        # Connection clients, ssh for native deployment, docker for docker deployment
24
        self.ssh_client = None
25
        self.docker_client = None
26
27
        # Time to wait between deploying agents
28
        self.wait_between_agent_deploys: float = 0.3
29
30
        # Lists of network agents based on their agent and deployment type
31
        self.network_agents_worker_native = []
32
        self.network_agents_worker_docker = []
33
        self.network_agents_server_native = []
34
        self.network_agents_server_docker = []
35
36
        if not workers:
37
            workers = []
38
        if not servers:
39
            servers = []
40
41
        self.__parse_network_agents_workers(processing_workers=workers)
42
        self.__parse_network_agents_servers(processor_servers=servers)
43
44
        # Used for caching deployed Processor Servers' ports on the current host
45
        # Key: processor_name, Value: list of ports
46
        self.processor_servers_ports: dict = {}
47
48
    def __add_deployed_agent_server_port_to_cache(self, processor_name: str, port: int) -> None:
49
        if processor_name not in self.processor_servers_ports:
50
            self.processor_servers_ports[processor_name] = [port]
51
            return
52
        self.processor_servers_ports[processor_name] = self.processor_servers_ports[processor_name].append(port)
53
54
    def __append_network_agent_to_lists(self, agent_data: DataNetworkAgent) -> None:
55
        if agent_data.deploy_type != DeployType.DOCKER and agent_data.deploy_type != DeployType.NATIVE:
56
            raise ValueError(f"Network agent deploy type is unknown: {agent_data.deploy_type}")
57
        if agent_data.agent_type != AgentType.PROCESSING_WORKER and agent_data.agent_type != AgentType.PROCESSOR_SERVER:
58
            raise ValueError(f"Network agent type is unknown: {agent_data.agent_type}")
59
60
        if agent_data.deploy_type == DeployType.NATIVE:
61
            self.needs_ssh_connector = True
62
            if agent_data.agent_type == AgentType.PROCESSING_WORKER:
63
                self.network_agents_worker_native.append(agent_data)
64
            if agent_data.agent_type == AgentType.PROCESSOR_SERVER:
65
                self.network_agents_server_native.append(agent_data)
66
        if agent_data.deploy_type == DeployType.DOCKER:
67
            self.needs_docker_connector = True
68
            if agent_data.agent_type == AgentType.PROCESSING_WORKER:
69
                self.network_agents_worker_docker.append(agent_data)
70
            if agent_data.agent_type == AgentType.PROCESSOR_SERVER:
71
                self.network_agents_server_docker.append(agent_data)
72
73
    def __parse_network_agents_servers(self, processor_servers: List[Dict]):
74
        for server in processor_servers:
75
            server_data = DataProcessorServer(
76
                processor_name=server["name"], deploy_type=server["deploy_type"], host=self.host,
77
                port=int(server["port"]), init_by_config=True, pid=None
78
            )
79
            self.__append_network_agent_to_lists(agent_data=server_data)
80
81
    def __parse_network_agents_workers(self, processing_workers: List[Dict]):
82
        for worker in processing_workers:
83
            worker_data = DataProcessingWorker(
84
                processor_name=worker["name"], deploy_type=worker["deploy_type"], host=self.host,
85
                init_by_config=True, pid=None
86
            )
87
            for _ in range(int(worker["number_of_instance"])):
88
                self.__append_network_agent_to_lists(agent_data=worker_data)
89
90
    def create_connection_client(self, client_type: str):
91
        if client_type not in ["docker", "ssh"]:
92
            raise ValueError(f"Host client type cannot be of type: {client_type}")
93
        if client_type == "ssh":
94
            self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath)
95
            return self.ssh_client
96
        if client_type == "docker":
97
            self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath)
98
            return self.docker_client
99
100
    def __deploy_network_agent(
101
        self, logger: Logger, agent_data: Union[DataProcessorServer, DataProcessingWorker],
102
        mongodb_url: str, rabbitmq_url: str
103
    ) -> None:
104
        deploy_type = agent_data.deploy_type
105
        agent_type = agent_data.agent_type
106
        name = agent_data.processor_name
107
        agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}, host: {self.host}"
108
        logger.info(f"Deploying {agent_info}")
109
110
        connection_client = None
111
        if deploy_type == DeployType.NATIVE:
112
            assert self.ssh_client, f"SSH client connection missing."
113
            connection_client = self.ssh_client
114
        if deploy_type == DeployType.DOCKER:
115
            assert self.docker_client, f"Docker client connection missing."
116
            connection_client = self.docker_client
117
118
        if agent_type == AgentType.PROCESSING_WORKER:
119
            agent_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url)
120
        if agent_type == AgentType.PROCESSOR_SERVER:
121
            agent_data.deploy_network_agent(logger, connection_client, mongodb_url)
122
123
        sleep(self.wait_between_agent_deploys)
124
125
    def __deploy_network_agents_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str):
126
        logger.info(f"Deploying processing workers on host: {self.host}")
127
        amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker)
128
        if not amount_workers:
129
            logger.info(f"No processing workers found to be deployed")
130
        for data_worker in self.network_agents_worker_native:
131
            self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url)
132
        for data_worker in self.network_agents_worker_docker:
133
            self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url)
134
135
    def __deploy_network_agents_servers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str):
136
        logger.info(f"Deploying processor servers on host: {self.host}")
137
        amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker)
138
        if not amount_servers:
139
            logger.info(f"No processor servers found to be deployed")
140
        for data_server in self.network_agents_server_native:
141
            self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url)
142
            self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port)
143
        for data_server in self.network_agents_server_docker:
144
            self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url)
145
            self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port)
146
147
    def deploy_network_agents(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None:
148
        if self.needs_ssh_connector and not self.ssh_client:
149
            logger.debug("Creating missing ssh connector before deploying")
150
            self.ssh_client = self.create_connection_client(client_type="ssh")
151
        if self.needs_docker_connector:
152
            logger.debug("Creating missing docker connector before deploying")
153
            self.docker_client = self.create_connection_client(client_type="docker")
154
        self.__deploy_network_agents_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url)
155
        self.__deploy_network_agents_servers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url)
156
        if self.ssh_client:
157
            self.ssh_client.close()
158
            self.ssh_client = None
159
        if self.docker_client:
160
            self.docker_client.close()
161
            self.docker_client = None
162
163
    def __stop_network_agent(self, logger: Logger, name: str, deploy_type: DeployType, agent_type: AgentType, pid: str):
164
        agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}"
165
        if not pid:
166
            logger.warning(f"No pid was passed for {agent_info}")
167
            return
168
        agent_info += f", pid: {pid}"
169
        logger.info(f"Stopping {agent_info}")
170
        if deploy_type == DeployType.NATIVE:
171
            assert self.ssh_client, f"SSH client connection missing"
172
            self.ssh_client.exec_command(f"kill {pid}")
173
        if deploy_type == DeployType.DOCKER:
174
            assert self.docker_client, f"Docker client connection missing"
175
            self.docker_client.containers.get(pid).stop()
176
177
    def __stop_network_agents_workers(self, logger: Logger):
178
        logger.info(f"Stopping processing workers on host: {self.host}")
179
        amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker)
180
        if not amount_workers:
181
            logger.warning(f"No active processing workers to be stopped.")
182
        for worker in self.network_agents_worker_native:
183
            self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid)
184
        self.network_agents_worker_native = []
185
        for worker in self.network_agents_worker_docker:
186
            self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid)
187
        self.network_agents_worker_docker = []
188
189
    def __stop_network_agents_servers(self, logger: Logger):
190
        logger.info(f"Stopping processor servers on host: {self.host}")
191
        amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker)
192
        if not amount_servers:
193
            logger.warning(f"No active processor servers to be stopped.")
194
        for server in self.network_agents_server_native:
195
            self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid)
196
        self.network_agents_server_native = []
197
        for server in self.network_agents_server_docker:
198
            self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid)
199
        self.network_agents_server_docker = []
200
201
    def stop_network_agents(self, logger: Logger):
202
        if self.needs_ssh_connector and not self.ssh_client:
203
            logger.debug("Creating missing ssh connector before stopping")
204
            self.ssh_client = self.create_connection_client(client_type="ssh")
205
        if self.needs_docker_connector and not self.docker_client:
206
            logger.debug("Creating missing docker connector before stopping")
207
            self.docker_client = self.create_connection_client(client_type="docker")
208
        self.__stop_network_agents_workers(logger=logger)
209
        self.__stop_network_agents_servers(logger=logger)
210
        if self.ssh_client:
211
            self.ssh_client.close()
212
            self.ssh_client = None
213
        if self.docker_client:
214
            self.docker_client.close()
215
            self.docker_client = None
216
217
    def resolve_processor_server_url(self, processor_name: str) -> str:
218
        processor_server_url = ''
219
        for data_server in self.network_agents_server_docker:
220
            if data_server.processor_name == processor_name:
221
                processor_server_url = f"http://{self.host}:{data_server.port}/"
222
        for data_server in self.network_agents_server_native:
223
            if data_server.processor_name == processor_name:
224
                processor_server_url = f"http://{self.host}:{data_server.port}/"
225
        return processor_server_url
226