Passed
Pull Request — master (#1191)
by
unknown
03:15
created

ocrd_network.runtime_data.hosts   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 211
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 50
eloc 170
dl 0
loc 211
rs 8.4
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
C DataHost.deploy_network_agents() 0 29 10
A DataHost.__parse_network_agents() 0 21 4
B DataHost.__deploy_network_agent() 0 27 7
A DataHost.__append_network_agent_to_lists() 0 16 5
A DataHost.__init__() 0 43 3
A DataHost.create_connection_client() 0 9 4
A DataHost.__stop_network_agent() 0 13 4
C DataHost.stop_network_agents() 0 30 11
A DataHost.__add_deployed_agent_server_port_to_cache() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.runtime_data.hosts often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from logging import Logger
2
from time import sleep
3
from typing import Dict, List, Union
4
5
from .connection_clients import create_docker_client, create_ssh_client
6
from .network_agents import AgentType, DataNetworkAgent, DataProcessingWorker, DataProcessorServer, DeployType
7
8
9
class DataHost:
10
    def __init__(
11
        self,
12
        host: str,
13
        username: str,
14
        password: str,
15
        keypath: str,
16
        workers: List[Dict],
17
        servers: List[Dict]
18
    ) -> None:
19
        self.host = host
20
        self.username = username
21
        self.password = password
22
        self.keypath = keypath
23
24
        # These flags are used to track whether a connection of the specified
25
        # type should be created based on the received config file
26
        self.needs_ssh_connector: bool = False
27
        self.needs_docker_connector: bool = False
28
29
        # Connection clients, ssh for native deployment, docker for docker deployment
30
        self.ssh_client = None
31
        self.docker_client = None
32
33
        # Time to wait between deploying agents
34
        self.wait_between_agent_deploys: float = 0.3
35
36
        # Lists of network agents based on their deployment type
37
        self.network_agents_docker = []
38
        self.network_agents_native = []
39
        # Lists of network agents based on their agent type
40
        self.network_agents_worker = []
41
        self.network_agents_server = []
42
43
        if not workers:
44
            workers = []
45
        if not servers:
46
            servers = []
47
48
        self.__parse_network_agents(processing_workers=workers, processor_servers=servers)
49
50
        # Used for caching deployed Processor Servers' ports on the current host
51
        # Key: processor_name, Value: list of ports
52
        self.processor_servers_ports: dict = {}
53
54
    def __add_deployed_agent_server_port_to_cache(self, processor_name: str, port: int):
55
        if processor_name not in self.processor_servers_ports:
56
            self.processor_servers_ports[processor_name] = [port]
57
            return
58
        self.processor_servers_ports[processor_name] = self.processor_servers_ports[processor_name].append(port)
59
60
    def __append_network_agent_to_lists(self, agent_data: DataNetworkAgent):
61
        if agent_data.deploy_type == DeployType.NATIVE:
62
            self.needs_ssh_connector = True
63
            self.network_agents_native.append(agent_data)
64
        elif agent_data.deploy_type == DeployType.DOCKER:
65
            self.needs_docker_connector = True
66
            self.network_agents_docker.append(agent_data)
67
        else:
68
            raise ValueError(f"Network agent deploy type is unknown: {agent_data.deploy_type}")
69
70
        if agent_data.agent_type == AgentType.PROCESSING_WORKER:
71
            self.network_agents_worker.append(agent_data)
72
        elif agent_data.agent_type == AgentType.PROCESSOR_SERVER:
73
            self.network_agents_server.append(agent_data)
74
        else:
75
            raise ValueError(f"Network agent type is unknown: {agent_data.agent_type}")
76
77
    def __parse_network_agents(self, processing_workers: List[Dict], processor_servers: List[Dict]):
78
        for worker in processing_workers:
79
            worker_data = DataProcessingWorker(
80
                processor_name=worker["name"],
81
                deploy_type=worker["deploy_type"],
82
                host=self.host,
83
                init_by_config=True,
84
                pid=None
85
            )
86
            for _ in range(int(worker["number_of_instance"])):
87
                self.__append_network_agent_to_lists(agent_data=worker_data)
88
        for server in processor_servers:
89
            server_data = DataProcessorServer(
90
                processor_name=server["name"],
91
                deploy_type=server["deploy_type"],
92
                host=self.host,
93
                port=int(server["port"]),
94
                init_by_config=True,
95
                pid=None
96
            )
97
            self.__append_network_agent_to_lists(agent_data=server_data)
98
99
    def create_connection_client(self, client_type: str):
100
        if client_type not in ["docker", "ssh"]:
101
            raise ValueError(f"Host client type cannot be of type: {client_type}")
102
        if client_type == "ssh":
103
            self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath)
104
            return self.ssh_client
105
        if client_type == "docker":
106
            self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath)
107
            return self.docker_client
108
109
    def __deploy_network_agent(
110
        self,
111
        logger: Logger,
112
        agent_data: Union[DataProcessorServer, DataProcessingWorker],
113
        mongodb_url: str,
114
        rabbitmq_url: str
115
    ):
116
        deploy_type = agent_data.deploy_type
117
        agent_type = agent_data.agent_type
118
        name = agent_data.processor_name
119
        agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}, host: {self.host}"
120
        logger.info(f"Deploying {agent_info}")
121
122
        if deploy_type == DeployType.NATIVE:
123
            assert self.ssh_client, f"SSH client connection missing."
124
            if agent_type == AgentType.PROCESSING_WORKER:
125
                agent_data.deploy_network_agent(logger, self.ssh_client, mongodb_url, rabbitmq_url)
126
            if agent_type == AgentType.PROCESSOR_SERVER:
127
                agent_data.deploy_network_agent(logger, self.ssh_client, mongodb_url)
128
129
        if deploy_type == DeployType.DOCKER:
130
            assert self.docker_client, f"Docker client connection missing."
131
            if agent_type == AgentType.PROCESSING_WORKER:
132
                raise Exception("Deploying Processing worker of docker type is not supported yet!")
133
            if agent_type == AgentType.PROCESSOR_SERVER:
134
                raise Exception("Deploying Processor Server of docker type is not supported yet!")
135
        sleep(self.wait_between_agent_deploys)
136
137
    def deploy_network_agents(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None:
138
        if self.needs_ssh_connector and not self.ssh_client:
139
            logger.debug("Creating missing ssh connector before deploying")
140
            self.ssh_client = self.create_connection_client(client_type="ssh")
141
        if self.needs_docker_connector:
142
            logger.debug("Creating missing docker connector before deploying")
143
            self.docker_client = self.create_connection_client(client_type="docker")
144
145
        logger.info(f"Deploying processing workers on host: {self.host}")
146
        amount_workers = len(self.network_agents_worker)
147
        if not amount_workers:
148
            logger.info(f"No processing workers found to be deployed")
149
        for data_worker in self.network_agents_worker:
150
            self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url)
151
152
        logger.info(f"Deploying processor servers on host: {self.host}")
153
        amount_processors = len(self.network_agents_server)
154
        if not amount_processors:
155
            logger.info(f"No processor servers found to be deployed")
156
        for data_server in self.network_agents_server:
157
            self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url)
158
            self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port)
159
160
        if self.ssh_client:
161
            self.ssh_client.close()
162
            self.ssh_client = None
163
        if self.docker_client:
164
            self.docker_client.close()
165
            self.docker_client = None
166
167
    def __stop_network_agent(self, logger: Logger, name: str, deploy_type: DeployType, agent_type: AgentType, pid: str):
168
        agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}"
169
        if not pid:
170
            logger.warning(f"No pid was passed for {agent_info}")
171
            return
172
        agent_info += f", pid: {pid}"
173
        logger.info(f"Stopping {agent_info}")
174
        if deploy_type == DeployType.NATIVE:
175
            assert self.ssh_client, f"SSH client connection missing"
176
            self.ssh_client.exec_command(f"kill {pid}")
177
        if deploy_type == DeployType.DOCKER:
178
            assert self.docker_client, f"Docker client connection missing"
179
            self.docker_client.containers.get(pid).stop()
180
181
    def stop_network_agents(self, logger: Logger):
182
        if self.needs_ssh_connector and not self.ssh_client:
183
            logger.debug("Creating missing ssh connector before stopping")
184
            self.ssh_client = self.create_connection_client(client_type="ssh")
185
        if self.needs_docker_connector and not self.docker_client:
186
            logger.debug("Creating missing docker connector before stopping")
187
            self.docker_client = self.create_connection_client(client_type="docker")
188
189
        logger.info(f"Stopping processing workers on host: {self.host}")
190
        amount_workers = len(self.network_agents_worker)
191
        if not amount_workers:
192
            logger.warning(f"No active processing workers to be stopped.")
193
        for worker in self.network_agents_worker:
194
            self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid)
195
        self.network_agents_worker = []
196
197
        logger.info(f"Stopping processor servers on host: {self.host}")
198
        amount_servers = len(self.network_agents_server)
199
        if not amount_servers:
200
            logger.warning(f"No active processor servers to be stopped.")
201
        for server in self.network_agents_server:
202
            self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid)
203
        self.network_agents_server = []
204
205
        if self.ssh_client:
206
            self.ssh_client.close()
207
            self.ssh_client = None
208
        if self.docker_client:
209
            self.docker_client.close()
210
            self.docker_client = None
211