Passed
Pull Request — master (#1338)
by
unknown
02:09
created

ocrd_network.runtime_data.hosts   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 157
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 42
eloc 125
dl 0
loc 157
rs 9.0399
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A DataHost.__append_workers_to_lists() 0 10 5
B DataHost.deploy_workers() 0 16 6
A DataHost.__parse_workers() 0 8 3
C DataHost.stop_workers() 0 25 10
A DataHost.__deploy_single_worker() 0 19 3
A DataHost.__init__() 0 30 3
A DataHost.__deploy_all_workers() 0 9 4
A DataHost.create_connection_client() 0 9 4
A DataHost.__stop_worker() 0 13 4

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.runtime_data.hosts often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from logging import Logger
2
from time import sleep
3
from typing import Dict, List
4
5
from .connection_clients import create_docker_client, create_ssh_client
6
from .network_agents import DataProcessingWorker, DeployType
7
8
9
class DataHost:
10
    def __init__(
11
        self, host: str, username: str, password: str, keypath: str, workers: List[Dict], servers: List[Dict]
12
    ) -> None:
13
        self.host = host
14
        self.username = username
15
        self.password = password
16
        self.keypath = keypath
17
18
        # These flags are used to track whether a connection of the specified
19
        # type should be created based on the received config file
20
        self.needs_ssh_connector: bool = False
21
        self.needs_docker_connector: bool = False
22
23
        # Connection clients, ssh for native deployment, docker for docker deployment
24
        self.ssh_client = None
25
        self.docker_client = None
26
27
        # Time to wait between deploying single workers
28
        self.wait_between_deploys: float = 0.3
29
30
        # Lists of Processing Workers based on their deployment type
31
        self.workers_native = []
32
        self.workers_docker = []
33
34
        if not workers:
35
            workers = []
36
        if not servers:
37
            servers = []
38
39
        self.__parse_workers(processing_workers=workers)
40
41
    def __append_workers_to_lists(self, worker_data: DataProcessingWorker) -> None:
42
        if worker_data.deploy_type != DeployType.DOCKER and worker_data.deploy_type != DeployType.NATIVE:
43
            raise ValueError(f"Processing Worker deploy type is unknown: {worker_data.deploy_type}")
44
45
        if worker_data.deploy_type == DeployType.NATIVE:
46
            self.needs_ssh_connector = True
47
            self.workers_native.append(worker_data)
48
        if worker_data.deploy_type == DeployType.DOCKER:
49
            self.needs_docker_connector = True
50
            self.workers_docker.append(worker_data)
51
52
    def __parse_workers(self, processing_workers: List[Dict]):
53
        for worker in processing_workers:
54
            worker_data = DataProcessingWorker(
55
                processor_name=worker["name"], deploy_type=worker.get("deploy_type", "native"),
56
                host=self.host, init_by_config=True, pid=None
57
            )
58
            for _ in range(int(worker["number_of_instance"])):
59
                self.__append_workers_to_lists(worker_data=worker_data)
60
61
    def create_connection_client(self, client_type: str):
62
        if client_type not in ["docker", "ssh"]:
63
            raise ValueError(f"Host client type cannot be of type: {client_type}")
64
        if client_type == "ssh":
65
            self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath)
66
            return self.ssh_client
67
        if client_type == "docker":
68
            self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath)
69
            return self.docker_client
70
71
    def __deploy_single_worker(
72
        self, logger: Logger, worker_data: DataProcessingWorker,
73
        mongodb_url: str, rabbitmq_url: str
74
    ) -> None:
75
        deploy_type = worker_data.deploy_type
76
        name = worker_data.processor_name
77
        worker_info = f"Processing Worker, deploy: {deploy_type}, name: {name}, host: {self.host}"
78
        logger.info(f"Deploying {worker_info}")
79
80
        connection_client = None
81
        if deploy_type == DeployType.NATIVE:
82
            assert self.ssh_client, "SSH client connection missing."
83
            connection_client = self.ssh_client
84
        if deploy_type == DeployType.DOCKER:
85
            assert self.docker_client, "Docker client connection missing."
86
            connection_client = self.docker_client
87
88
        worker_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url)
89
        sleep(self.wait_between_deploys)
90
91
    def __deploy_all_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str):
92
        logger.info(f"Deploying processing workers on host: {self.host}")
93
        amount_workers = len(self.workers_native) + len(self.workers_docker)
94
        if not amount_workers:
95
            logger.info("No processing workers found to be deployed")
96
        for data_worker in self.workers_native:
97
            self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url)
98
        for data_worker in self.workers_docker:
99
            self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url)
100
101
    def deploy_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None:
102
        if self.needs_ssh_connector and not self.ssh_client:
103
            logger.debug("Creating missing ssh connector before deploying")
104
            self.ssh_client = self.create_connection_client(client_type="ssh")
105
        if self.needs_docker_connector:
106
            logger.debug("Creating missing docker connector before deploying")
107
            self.docker_client = self.create_connection_client(client_type="docker")
108
109
        self.__deploy_all_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url)
110
111
        if self.ssh_client:
112
            self.ssh_client.close()
113
            self.ssh_client = None
114
        if self.docker_client:
115
            self.docker_client.close()
116
            self.docker_client = None
117
118
    def __stop_worker(self, logger: Logger, name: str, deploy_type: DeployType, pid: str):
119
        worker_info = f"Processing Worker: deploy: {deploy_type}, name: {name}"
120
        if not pid:
121
            logger.warning(f"No pid was passed for {worker_info}")
122
            return
123
        worker_info += f", pid: {pid}"
124
        logger.info(f"Stopping {worker_info}")
125
        if deploy_type == DeployType.NATIVE:
126
            assert self.ssh_client, "SSH client connection missing"
127
            self.ssh_client.exec_command(f"kill {pid}")
128
        if deploy_type == DeployType.DOCKER:
129
            assert self.docker_client, "Docker client connection missing"
130
            self.docker_client.containers.get(pid).stop()
131
132
    def stop_workers(self, logger: Logger):
133
        if self.needs_ssh_connector and not self.ssh_client:
134
            logger.debug("Creating missing ssh connector before stopping")
135
            self.ssh_client = self.create_connection_client(client_type="ssh")
136
        if self.needs_docker_connector and not self.docker_client:
137
            logger.debug("Creating missing docker connector before stopping")
138
            self.docker_client = self.create_connection_client(client_type="docker")
139
140
        logger.info(f"Stopping processing workers on host: {self.host}")
141
        amount_workers = len(self.workers_native) + len(self.workers_docker)
142
        if not amount_workers:
143
            logger.warning("No active processing workers to be stopped.")
144
        for worker in self.workers_native:
145
            self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid)
146
        self.workers_native = []
147
        for worker in self.workers_docker:
148
            self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid)
149
        self.workers_docker = []
150
151
        if self.ssh_client:
152
            self.ssh_client.close()
153
            self.ssh_client = None
154
        if self.docker_client:
155
            self.docker_client.close()
156
            self.docker_client = None
157