Passed
Pull Request — master (#1319)
by Konstantin
02:12
created

ocrd_network.runtime_data.hosts   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 184
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 44
eloc 150
dl 0
loc 184
rs 8.8798
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A DataHost.__append_workers_to_lists() 0 10 5
B DataHost.deploy_workers() 0 21 6
A DataHost.__stop_network_agent_resource_manager_server() 0 5 1
A DataHost.__parse_workers() 0 8 3
C DataHost.stop_workers() 0 30 10
A DataHost.__deploy_single_worker() 0 18 3
A DataHost.__deploy_network_agent_resource_manager_server() 0 6 1
A DataHost.__init__() 0 29 3
A DataHost.__deploy_all_workers() 0 11 4
A DataHost.create_connection_client() 0 9 4
A DataHost.__stop_worker() 0 13 4

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.runtime_data.hosts often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from logging import Logger
2
from typing import Dict, List, Optional
3
4
from docker import APIClient
5
from paramiko import SSHClient
6
7
from ..constants import RESOURCE_MANAGER_SERVER_PORT
8
from .connection_clients import CustomDockerClient, create_docker_client, create_ssh_client
9
from .network_agents import (
10
    DataProcessingWorker, DeployType, deploy_agent_native_get_pid_hack)
11
12
13
class DataHost:
14
    def __init__(
15
        self, host: str, username: str, password: str, keypath: str, workers: List[Dict], servers: List[Dict]
16
    ) -> None:
17
        self.host = host
18
        self.resource_manager_port = RESOURCE_MANAGER_SERVER_PORT
19
        self.resource_manager_pid = None
20
        self.username = username
21
        self.password = password
22
        self.keypath = keypath
23
24
        # These flags are used to track whether a connection of the specified
25
        # type should be created based on the received config file
26
        self.needs_ssh_connector: bool = False
27
        self.needs_docker_connector: bool = False
28
29
        # Connection clients, ssh for native deployment, docker for docker deployment
30
        self.ssh_client = None
31
        self.docker_client: Optional[CustomDockerClient] = None
32
33
        # Lists of network agents based on their agent and deployment type
34
        self.workers_native: List[DataProcessingWorker] = []
35
        self.workers_docker: List[DataProcessingWorker] = []
36
37
        if not workers:
38
            workers = []
39
        if not servers:
40
            servers = []
41
42
        self.__parse_workers(processing_workers=workers)
43
44
    def __append_workers_to_lists(self, worker_data: DataProcessingWorker) -> None:
45
        if worker_data.deploy_type != DeployType.DOCKER and worker_data.deploy_type != DeployType.NATIVE:
46
            raise ValueError(f"Processing Worker deploy type is unknown: {worker_data.deploy_type}")
47
48
        if worker_data.deploy_type == DeployType.NATIVE:
49
            self.needs_ssh_connector = True
50
            self.workers_native.append(worker_data)
51
        if worker_data.deploy_type == DeployType.DOCKER:
52
            self.needs_docker_connector = True
53
            self.workers_docker.append(worker_data)
54
55
    def __parse_workers(self, processing_workers: List[Dict]):
56
        for worker in processing_workers:
57
            worker_data = DataProcessingWorker(
58
                processor_name=worker["name"], deploy_type=worker.get("deploy_type", "native"),
59
                host=self.host, init_by_config=True, pid=None
60
            )
61
            for _ in range(int(worker["number_of_instance"])):
62
                self.__append_workers_to_lists(worker_data=worker_data)
63
64
    def create_connection_client(self, client_type: str):
65
        if client_type not in ["docker", "ssh"]:
66
            raise ValueError(f"Host client type cannot be of type: {client_type}")
67
        if client_type == "ssh":
68
            self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath)
69
            return self.ssh_client
70
        if client_type == "docker":
71
            self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath)
72
            return self.docker_client
73
74
    def __deploy_network_agent_resource_manager_server(self, logger: Logger):
75
        logger.info(f"Deploying resource manager server on host: {self.host}:{self.resource_manager_port}")
76
        start_cmd = f"ocrd network resmgr-server --address {self.host}:{self.resource_manager_port} &"
77
        pid = deploy_agent_native_get_pid_hack(logger, self.ssh_client, start_cmd)
78
        logger.info(f"Deployed: OCR-D Resource Manager Server [{pid}]: {self.host}:{self.resource_manager_port}")
79
        self.resource_manager_pid = pid
80
81
    def __deploy_single_worker(
82
        self, logger: Logger, worker_data: DataProcessingWorker,
83
        mongodb_url: str, rabbitmq_url: str
84
    ) -> None:
85
        deploy_type = worker_data.deploy_type
86
        name = worker_data.processor_name
87
        worker_info = f"Processing Worker, deploy: {deploy_type}, name: {name}, host: {self.host}"
88
        logger.info(f"Deploying {worker_info}")
89
90
        connection_client = None
91
        if deploy_type == DeployType.NATIVE:
92
            assert self.ssh_client, "SSH client connection missing."
93
            connection_client = self.ssh_client
94
        if deploy_type == DeployType.DOCKER:
95
            assert self.docker_client, "Docker client connection missing."
96
            connection_client = self.docker_client
97
98
        worker_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url)
99
100
    def __deploy_all_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str):
101
        logger.info(f"Deploying processing workers on host: {self.host}")
102
        amount_workers = len(self.workers_native) + len(self.workers_docker)
103
        if not amount_workers:
104
            logger.info("No processing workers found to be deployed")
105
        for data_worker in self.workers_native:
106
            self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url)
107
            logger.info(f"Deployed: {data_worker}")
108
        for data_worker in self.workers_docker:
109
            self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url)
110
            logger.info(f"Deployed: {data_worker}")
111
112
    def deploy_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None:
113
        if self.needs_ssh_connector and not self.ssh_client:
114
            logger.debug("Creating missing ssh connector before deploying")
115
            client = self.create_connection_client(client_type="ssh")
116
            assert isinstance(client, SSHClient)
117
            self.ssh_client = client
118
        if self.needs_docker_connector:
119
            logger.debug("Creating missing docker connector before deploying")
120
            client = self.create_connection_client(client_type="docker")
121
            assert isinstance(client, CustomDockerClient)
122
            self.docker_client = client
123
124
        self.__deploy_network_agent_resource_manager_server(logger)
125
        self.__deploy_all_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url)
126
127
        if self.ssh_client:
128
            self.ssh_client.close()
129
            self.ssh_client = None
130
        if self.docker_client:
131
            self.docker_client.close()
132
            self.docker_client = None
133
134
    def __stop_network_agent_resource_manager_server(self, logger: Logger):
135
        logger.info(f"Stopping OCR-D Resource Manager Server [{self.resource_manager_pid}]: "
136
                    f"{self.host}:{self.resource_manager_port}")
137
        assert self.ssh_client, "SSH client connection missing"
138
        self.ssh_client.exec_command(f"kill {self.resource_manager_pid}")
139
140
    def __stop_worker(self, logger: Logger, name: str, deploy_type: DeployType, pid: str):
141
        worker_info = f"Processing Worker: deploy: {deploy_type}, name: {name}"
142
        if not pid:
143
            logger.warning(f"No pid was passed for {worker_info}")
144
            return
145
        worker_info += f", pid: {pid}"
146
        logger.info(f"Stopping {worker_info}")
147
        if deploy_type == DeployType.NATIVE:
148
            assert self.ssh_client, "SSH client connection missing"
149
            self.ssh_client.exec_command(f"kill {pid}")
150
        if deploy_type == DeployType.DOCKER:
151
            assert self.docker_client, "Docker client connection missing"
152
            self.docker_client.containers.get(pid).stop()
153
154
    def stop_workers(self, logger: Logger):
155
        if self.needs_ssh_connector and not self.ssh_client:
156
            logger.debug("Creating missing ssh connector before stopping")
157
            client = self.create_connection_client(client_type="ssh")
158
            assert isinstance(client, SSHClient)
159
            self.ssh_client = client
160
        if self.needs_docker_connector and not self.docker_client:
161
            logger.debug("Creating missing docker connector before stopping")
162
            client = self.create_connection_client(client_type="docker")
163
            assert isinstance(client, CustomDockerClient)
164
            self.docker_client = client
165
        self.__stop_network_agent_resource_manager_server(logger=logger)
166
167
        logger.info(f"Stopping processing workers on host: {self.host}")
168
        amount_workers = len(self.workers_native) + len(self.workers_docker)
169
        if not amount_workers:
170
            logger.warning("No active processing workers to be stopped.")
171
        for worker in self.workers_native:
172
            self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid)
173
        self.workers_native = []
174
        for worker in self.workers_docker:
175
            self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid)
176
        self.workers_docker = []
177
178
        if self.ssh_client:
179
            self.ssh_client.close()
180
            self.ssh_client = None
181
        if self.docker_client:
182
            self.docker_client.close()
183
            self.docker_client = None
184