Passed
Pull Request — master (#1030)
by Konstantin
05:34 queued 02:38
created

ocrd_network.deployer   F

Complexity

Total Complexity 80

Size/Duplication

Total Lines 488
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 80
eloc 346
dl 0
loc 488
rs 2
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
B Deployer.deploy_rabbitmq() 0 59 4
B Deployer.kill_processing_workers() 0 16 6
B Deployer.kill_hosts() 0 23 6
B Deployer.kill_processor_servers() 0 16 6
A Deployer.start_docker_processor() 0 15 1
A Deployer.start_native_processor_server() 0 21 1
A Deployer.start_native_processor() 0 35 1
A Deployer.kill_rabbitmq() 0 14 2
B Deployer.deploy_mongodb() 0 38 6
A Deployer.kill_mongodb() 0 14 2
A Deployer.resolve_processor_server_url() 0 7 4
B Deployer.deploy_hosts() 0 36 8
A Deployer.kill_all() 0 10 1
A Deployer.__init__() 0 9 2
F Deployer.find_matching_processors() 0 57 22
A Deployer._deploy_processing_worker() 0 31 3
B Deployer._deploy_processor_server() 0 32 5

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.deployer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Abstraction of the deployment functionality for processors.
3
4
The Processing Server provides the configuration parameters to the Deployer agent.
5
The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts.
6
Each Processing Host may have several Processing Workers.
7
Each Processing Worker is an instance of an OCR-D processor.
8
"""
9
from __future__ import annotations
10
from typing import Dict, List, Union
11
from re import search as re_search
12
from os import getpid
13
from time import sleep
14
15
from ocrd_utils import getLogger
16
17
from .deployment_utils import (
18
    create_docker_client,
19
    DeployType,
20
    wait_for_rabbitmq_availability
21
)
22
23
from .runtime_data import (
24
    DataHost,
25
    DataMongoDB,
26
    DataProcessingWorker,
27
    DataProcessorServer,
28
    DataRabbitMQ
29
)
30
from .utils import validate_and_load_config
31
32
33
class Deployer:
34
    def __init__(self, config_path: str) -> None:
35
        self.log = getLogger(__name__)
36
        config = validate_and_load_config(config_path)
37
38
        self.data_mongo: DataMongoDB = DataMongoDB(config['database'])
39
        self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue'])
40
        self.data_hosts: List[DataHost] = []
41
        for config_host in config['hosts']:
42
            self.data_hosts.append(DataHost(config_host))
43
44
    # TODO: Reconsider this.
45
    def find_matching_processors(
46
            self,
47
            worker_only: bool = False,
48
            server_only: bool = False,
49
            docker_only: bool = False,
50
            native_only: bool = False,
51
            str_names_only: bool = False,
52
            unique_only: bool = False
53
    ) -> Union[List[str], List[object]]:
54
        """Finds and returns a list of matching data objects of type:
55
        `DataProcessingWorker` and `DataProcessorServer`.
56
57
        :py:attr:`worker_only` match only processors with worker status
58
        :py:attr:`server_only` match only processors with server status
59
        :py:attr:`docker_only` match only docker processors
60
        :py:attr:`native_only` match only native processors
61
        :py:attr:`str_only` returns the processor_name instead of data object
62
        :py:attr:`unique_only` remove duplicates from the matches
63
64
        `worker_only` and `server_only` are mutually exclusive to each other
65
        `docker_only` and `native_only` are mutually exclusive to each other
66
        `unique_only` is allowed only together with `str_names_only`
67
        """
68
69
        if worker_only and server_only:
70
            raise ValueError(f"Only 'worker_only' or 'server_only' is allowed, not both.")
71
        if docker_only and native_only:
72
            raise ValueError(f"Only 'docker_only' or 'native_only' is allowed, not both.")
73
        if not str_names_only and unique_only:
74
            raise ValueError(f"Value 'unique_only' is allowed only together with 'str_names_only'")
75
76
        # Find all matching objects of type:
77
        # DataProcessingWorker or DataProcessorServer
78
        matched_objects = []
79
        for data_host in self.data_hosts:
80
            if not server_only:
81
                for data_worker in data_host.data_workers:
82
                    if data_worker.deploy_type == DeployType.NATIVE and docker_only:
83
                        continue
84
                    if data_worker.deploy_type == DeployType.DOCKER and native_only:
85
                        continue
86
                    matched_objects.append(data_worker)
87
            if not worker_only:
88
                for data_server in data_host.data_servers:
89
                    if data_server.deploy_type == DeployType.NATIVE and docker_only:
90
                        continue
91
                    if data_server.deploy_type == DeployType.DOCKER and native_only:
92
                        continue
93
                    matched_objects.append(data_server)
94
        if str_names_only:
95
            # gets only the processor names of the matched objects
96
            name_list = [match.processor_name for match in matched_objects]
97
            if unique_only:
98
                # removes the duplicates, if any
99
                return list(dict.fromkeys(name_list))
100
            return name_list
101
        return matched_objects
102
103
    def resolve_processor_server_url(self, processor_name) -> str:
104
        processor_server_url = ''
105
        for data_host in self.data_hosts:
106
            for data_server in data_host.data_servers:
107
                if data_server.processor_name == processor_name:
108
                    processor_server_url = f'http://{data_host.address}:{data_server.port}/'
109
        return processor_server_url
110
111
    def kill_all(self) -> None:
112
        """ kill all started services: hosts, database, queue
113
114
        The order of killing is important to optimize graceful shutdown in the future. If RabbitMQ
115
        server is killed before killing Processing Workers, that may have bad outcome and leave
116
        Processing Workers in an unpredictable state
117
        """
118
        self.kill_hosts()
119
        self.kill_mongodb()
120
        self.kill_rabbitmq()
121
122
    def deploy_hosts(
123
            self,
124
            mongodb_url: str,
125
            rabbitmq_url: str
126
    ) -> None:
127
        for host_data in self.data_hosts:
128
            if host_data.needs_ssh:
129
                host_data.create_client(client_type='ssh')
130
                assert host_data.ssh_client
131
            if host_data.needs_docker:
132
                host_data.create_client(client_type='docker')
133
                assert host_data.docker_client
134
135
            self.log.debug(f'Deploying processing workers on host: {host_data.address}')
136
            for data_worker in host_data.data_workers:
137
                self._deploy_processing_worker(
138
                    mongodb_url,
139
                    rabbitmq_url,
140
                    host_data,
141
                    data_worker
142
                )
143
144
            self.log.debug(f'Deploying processor servers on host: {host_data.address}')
145
            for data_server in host_data.data_servers:
146
                self._deploy_processor_server(
147
                    mongodb_url,
148
                    host_data,
149
                    data_server
150
                )
151
152
            if host_data.ssh_client:
153
                host_data.ssh_client.close()
154
                host_data.ssh_client = None
155
            if host_data.docker_client:
156
                host_data.docker_client.close()
157
                host_data.docker_client = None
158
159
    def _deploy_processing_worker(
160
            self,
161
            mongodb_url: str,
162
            rabbitmq_url: str,
163
            host_data: DataHost,
164
            data_worker: DataProcessingWorker
165
    ) -> None:
166
        self.log.debug(f"Deploying processing worker, "
167
                       f"environment: '{data_worker.deploy_type}', "
168
                       f"name: '{data_worker.processor_name}', "
169
                       f"address: '{host_data.address}'")
170
171
        if data_worker.deploy_type == DeployType.NATIVE:
172
            assert host_data.ssh_client  # to satisfy mypy
173
            pid = self.start_native_processor(
174
                ssh_client=host_data.ssh_client,
175
                processor_name=data_worker.processor_name,
176
                queue_url=rabbitmq_url,
177
                database_url=mongodb_url,
178
            )
179
            data_worker.pid = pid
180
        elif data_worker.deploy_type == DeployType.DOCKER:
181
            assert host_data.docker_client  # to satisfy mypy
182
            pid = self.start_docker_processor(
183
                docker_client=host_data.docker_client,
184
                processor_name=data_worker.processor_name,
185
                _queue_url=rabbitmq_url,
186
                _database_url=mongodb_url
187
            )
188
            data_worker.pid = pid
189
        sleep(0.2)
190
191
    # TODO: Revisit this to remove code duplications of deploy_* methods
192
    def _deploy_processor_server(
193
            self,
194
            mongodb_url: str,
195
            host_data: DataHost,
196
            data_server: DataProcessorServer,
197
    ) -> None:
198
        self.log.debug(f"Deploying processing worker, "
199
                       f"environment: '{data_server.deploy_type}', "
200
                       f"name: '{data_server.processor_name}', "
201
                       f"address: '{data_server.host}:{data_server.port}'")
202
203
        if data_server.deploy_type == DeployType.NATIVE:
204
            assert host_data.ssh_client
205
            pid = self.start_native_processor_server(
206
                ssh_client=host_data.ssh_client,
207
                processor_name=data_server.processor_name,
208
                agent_address=f'{data_server.host}:{data_server.port}',
209
                database_url=mongodb_url,
210
            )
211
            data_server.pid = pid
212
213
            if data_server.processor_name in host_data.server_ports:
214
                name = data_server.processor_name
215
                port = data_server.port
216
                if host_data.server_ports[name]:
217
                    host_data.server_ports[name] = host_data.server_ports[name].append(port)
218
                else:
219
                    host_data.server_ports[name] = [port]
220
            else:
221
                host_data.server_ports[data_server.processor_name] = [data_server.port]
222
        elif data_server.deploy_type == DeployType.DOCKER:
223
            raise Exception("Deploying docker processor server is not supported yet!")
224
225
    def deploy_rabbitmq(
226
            self,
227
            image: str,
228
            detach: bool,
229
            remove: bool,
230
            ports_mapping: Union[Dict, None] = None
231
    ) -> str:
232
        self.log.debug(f"Trying to deploy '{image}', with modes: "
233
                       f"detach='{detach}', remove='{remove}'")
234
235
        if not self.data_queue or not self.data_queue.address:
236
            raise ValueError('Deploying RabbitMQ has failed - missing configuration.')
237
238
        client = create_docker_client(
239
            self.data_queue.address,
240
            self.data_queue.ssh_username,
241
            self.data_queue.ssh_password,
242
            self.data_queue.ssh_keypath
243
        )
244
        if not ports_mapping:
245
            # 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS
246
            # 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS
247
            # 25672: used for internode and CLI tools communication and is allocated from
248
            # a dynamic range (limited to a single port by default, computed as AMQP port + 20000)
249
            ports_mapping = {
250
                5672: self.data_queue.port,
251
                15672: 15672,
252
                25672: 25672
253
            }
254
        res = client.containers.run(
255
            image=image,
256
            detach=detach,
257
            remove=remove,
258
            ports=ports_mapping,
259
            # The default credentials to be used by the processing workers
260
            environment=[
261
                f'RABBITMQ_DEFAULT_USER={self.data_queue.username}',
262
                f'RABBITMQ_DEFAULT_PASS={self.data_queue.password}'
263
            ]
264
        )
265
        assert res and res.id, \
266
            f'Failed to start RabbitMQ docker container on host: {self.data_queue.address}'
267
        self.data_queue.pid = res.id
268
        client.close()
269
270
        rmq_host = self.data_queue.address
271
        rmq_port = int(self.data_queue.port)
272
        rmq_vhost = '/'
273
274
        wait_for_rabbitmq_availability(
275
            host=rmq_host,
276
            port=rmq_port,
277
            vhost=rmq_vhost,
278
            username=self.data_queue.username,
279
            password=self.data_queue.password
280
        )
281
        self.log.info(f'The RabbitMQ server was deployed on URL: '
282
                      f'{rmq_host}:{rmq_port}{rmq_vhost}')
283
        return self.data_queue.url
284
285
    def deploy_mongodb(
286
            self,
287
            image: str,
288
            detach: bool,
289
            remove: bool,
290
            ports_mapping: Union[Dict, None] = None
291
    ) -> str:
292
        self.log.debug(f"Trying to deploy '{image}', with modes: "
293
                       f"detach='{detach}', remove='{remove}'")
294
295
        if not self.data_mongo or not self.data_mongo.address:
296
            raise ValueError('Deploying MongoDB has failed - missing configuration.')
297
298
        client = create_docker_client(
299
            self.data_mongo.address,
300
            self.data_mongo.ssh_username,
301
            self.data_mongo.ssh_password,
302
            self.data_mongo.ssh_keypath
303
        )
304
        if not ports_mapping:
305
            ports_mapping = {
306
                27017: self.data_mongo.port
307
            }
308
        res = client.containers.run(
309
            image=image,
310
            detach=detach,
311
            remove=remove,
312
            ports=ports_mapping
313
        )
314
        if not res or not res.id:
315
            raise RuntimeError('Failed to start MongoDB docker container on host: '
316
                               f'{self.data_mongo.address}')
317
        self.data_mongo.pid = res.id
318
        client.close()
319
320
        mongodb_hostinfo = f'{self.data_mongo.address}:{self.data_mongo.port}'
321
        self.log.info(f'The MongoDB was deployed on host: {mongodb_hostinfo}')
322
        return self.data_mongo.url
323
324
    def kill_rabbitmq(self) -> None:
325
        if not self.data_queue.pid:
326
            self.log.warning('No running RabbitMQ instance found')
327
            return
328
        client = create_docker_client(
329
            self.data_queue.address,
330
            self.data_queue.ssh_username,
331
            self.data_queue.ssh_password,
332
            self.data_queue.ssh_keypath
333
        )
334
        client.containers.get(self.data_queue.pid).stop()
335
        self.data_queue.pid = None
336
        client.close()
337
        self.log.info('The RabbitMQ is stopped')
338
339
    def kill_mongodb(self) -> None:
340
        if not self.data_mongo.pid:
341
            self.log.warning('No running MongoDB instance found')
342
            return
343
        client = create_docker_client(
344
            self.data_mongo.address,
345
            self.data_mongo.ssh_username,
346
            self.data_mongo.ssh_password,
347
            self.data_mongo.ssh_keypath
348
        )
349
        client.containers.get(self.data_mongo.pid).stop()
350
        self.data_mongo.pid = None
351
        client.close()
352
        self.log.info('The MongoDB is stopped')
353
354
    def kill_hosts(self) -> None:
355
        self.log.debug('Starting to kill/stop hosts')
356
        # Kill processing hosts
357
        for host_data in self.data_hosts:
358
            if host_data.needs_ssh:
359
                host_data.create_client(client_type='ssh')
360
                assert host_data.ssh_client
361
            if host_data.needs_docker:
362
                host_data.create_client(client_type='docker')
363
                assert host_data.docker_client
364
365
            self.log.debug(f'Killing/Stopping processing workers on host: {host_data.address}')
366
            self.kill_processing_workers(host_data)
367
368
            self.log.debug(f'Killing/Stopping processor servers on host: {host_data.address}')
369
            self.kill_processor_servers(host_data)
370
371
            if host_data.ssh_client:
372
                host_data.ssh_client.close()
373
                host_data.ssh_client = None
374
            if host_data.docker_client:
375
                host_data.docker_client.close()
376
                host_data.docker_client = None
377
378
    # TODO: Optimize the code duplication from start_* and kill_* methods
379
    def kill_processing_workers(self, host_data: DataHost) -> None:
380
        amount = len(host_data.data_workers)
381
        if not amount:
382
            self.log.info(f'No active processing workers to be stopped.')
383
            return
384
        self.log.info(f"Trying to stop {amount} processing workers:")
385
        for worker in host_data.data_workers:
386
            if not worker.pid:
387
                continue
388
            if worker.deploy_type == DeployType.NATIVE:
389
                host_data.ssh_client.exec_command(f'kill {worker.pid}')
390
                self.log.info(f"Stopped native worker with pid: '{worker.pid}'")
391
            elif worker.deploy_type == DeployType.DOCKER:
392
                host_data.docker_client.containers.get(worker.pid).stop()
393
                self.log.info(f"Stopped docker worker with container id: '{worker.pid}'")
394
        host_data.data_workers = []
395
396
    def kill_processor_servers(self, host_data: DataHost) -> None:
397
        amount = len(host_data.data_servers)
398
        if not amount:
399
            self.log.info(f'No active processor servers to be stopped.')
400
            return
401
        self.log.info(f"Trying to stop {amount} processing workers:")
402
        for server in host_data.data_servers:
403
            if not server.pid:
404
                continue
405
            if server.deploy_type == DeployType.NATIVE:
406
                host_data.ssh_client.exec_command(f'kill {server.pid}')
407
                self.log.info(f"Stopped native server with pid: '{server.pid}'")
408
            elif server.deploy_type == DeployType.DOCKER:
409
                host_data.docker_client.containers.get(server.pid).stop()
410
                self.log.info(f"Stopped docker server with container id: '{server.pid}'")
411
        host_data.data_servers = []
412
413
    def start_native_processor(
414
            self,
415
            ssh_client,
416
            processor_name: str,
417
            queue_url: str,
418
            database_url: str
419
    ) -> str:
420
        """ start a processor natively on a host via ssh
421
422
        Args:
423
            ssh_client:         paramiko SSHClient to execute commands on a host
424
            processor_name:     name of processor to run
425
            queue_url:          url to rabbitmq
426
            database_url:       url to database
427
428
        Returns:
429
            str: pid of running process
430
        """
431
        self.log.info(f'Starting native processing worker: {processor_name}')
432
        channel = ssh_client.invoke_shell()
433
        stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
434
        cmd = f'{processor_name} --type worker --database {database_url} --queue {queue_url}'
435
        # the only way (I could find) to make it work to start a process in the background and
436
        # return early is this construction. The pid of the last started background process is
437
        # printed with `echo $!` but it is printed inbetween other output. Because of that I added
438
        # `xyz` before and after the code to easily be able to filter out the pid via regex when
439
        # returning from the function
440
        log_path = '/tmp/ocrd-processing-server-startup.log'
441
        stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n")
442
        stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
443
        stdin.write('echo xyz$!xyz \n exit \n')
444
        output = stdout.read().decode('utf-8')
445
        stdout.close()
446
        stdin.close()
447
        return re_search(r'xyz([0-9]+)xyz', output).group(1)  # type: ignore
448
449
    def start_docker_processor(
450
            self,
451
            docker_client,
452
            processor_name: str,
453
            _queue_url: str,
454
            _database_url: str
455
    ) -> str:
456
        # TODO: Raise an exception here as well?
457
        #  raise Exception("Deploying docker processing worker is not supported yet!")
458
459
        self.log.info(f'Starting docker container processor: {processor_name}')
460
        # TODO: add real command here to start processing server in docker here
461
        res = docker_client.containers.run('debian', 'sleep 500s', detach=True, remove=True)
462
        assert res and res.id, f'Running processor: {processor_name} in docker-container failed'
463
        return res.id
464
465
    # TODO: Just a copy of the above start_native_processor() method.
466
    #  Far from being great... But should be good as a starting point
467
    def start_native_processor_server(
468
            self,
469
            ssh_client,
470
            processor_name: str,
471
            agent_address: str,
472
            database_url: str
473
    ) -> str:
474
        self.log.info(f"Starting native processor server: {processor_name} on {agent_address}")
475
        channel = ssh_client.invoke_shell()
476
        stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
477
        cmd = f'{processor_name} --type server --address {agent_address} --database {database_url}'
478
        port = agent_address.split(':')[1]
479
        log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log'
480
        # TODO: This entire stdin/stdout thing is broken with servers!
481
        stdin.write(f"echo starting processor server with '{cmd}' >> '{log_path}'\n")
482
        stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
483
        stdin.write('echo xyz$!xyz \n exit \n')
484
        output = stdout.read().decode('utf-8')
485
        stdout.close()
486
        stdin.close()
487
        return re_search(r'xyz([0-9]+)xyz', output).group(1)  # type: ignore
488