Passed
Push — master ( 670862...a75791 )
by Konstantin
02:31
created

ocrd_network.deployer.Deployer.kill_hosts()   B

Complexity

Conditions 6

Size

Total Lines 23
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 19
dl 0
loc 23
rs 8.5166
c 0
b 0
f 0
cc 6
nop 1
1
"""
2
Abstraction of the deployment functionality for processors.
3
4
The Processing Server provides the configuration parameters to the Deployer agent.
5
The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts.
6
Each Processing Host may have several Processing Workers.
7
Each Processing Worker is an instance of an OCR-D processor.
8
"""
9
from __future__ import annotations
10
from typing import Dict, List, Union
11
from re import search as re_search
12
from os import getpid
13
from time import sleep
14
15
from ocrd_utils import getLogger
16
17
from .deployment_utils import (
18
    create_docker_client,
19
    DeployType,
20
    verify_mongodb_available,
21
    verify_rabbitmq_available,
22
)
23
24
from .runtime_data import (
25
    DataHost,
26
    DataMongoDB,
27
    DataProcessingWorker,
28
    DataProcessorServer,
29
    DataRabbitMQ
30
)
31
from .utils import validate_and_load_config
32
33
34
class Deployer:
35
    def __init__(self, config_path: str) -> None:
36
        self.log = getLogger(__name__)
37
        config = validate_and_load_config(config_path)
38
39
        self.data_mongo: DataMongoDB = DataMongoDB(config['database'])
40
        self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue'])
41
        self.data_hosts: List[DataHost] = []
42
        for config_host in config['hosts']:
43
            self.data_hosts.append(DataHost(config_host))
44
45
    # TODO: Reconsider this.
46
    def find_matching_processors(
47
            self,
48
            worker_only: bool = False,
49
            server_only: bool = False,
50
            docker_only: bool = False,
51
            native_only: bool = False,
52
            str_names_only: bool = False,
53
            unique_only: bool = False
54
    ) -> Union[List[str], List[object]]:
55
        """Finds and returns a list of matching data objects of type:
56
        `DataProcessingWorker` and `DataProcessorServer`.
57
58
        :py:attr:`worker_only` match only processors with worker status
59
        :py:attr:`server_only` match only processors with server status
60
        :py:attr:`docker_only` match only docker processors
61
        :py:attr:`native_only` match only native processors
62
        :py:attr:`str_only` returns the processor_name instead of data object
63
        :py:attr:`unique_only` remove duplicates from the matches
64
65
        `worker_only` and `server_only` are mutually exclusive to each other
66
        `docker_only` and `native_only` are mutually exclusive to each other
67
        `unique_only` is allowed only together with `str_names_only`
68
        """
69
70
        if worker_only and server_only:
71
            raise ValueError(f"Only 'worker_only' or 'server_only' is allowed, not both.")
72
        if docker_only and native_only:
73
            raise ValueError(f"Only 'docker_only' or 'native_only' is allowed, not both.")
74
        if not str_names_only and unique_only:
75
            raise ValueError(f"Value 'unique_only' is allowed only together with 'str_names_only'")
76
77
        # Find all matching objects of type:
78
        # DataProcessingWorker or DataProcessorServer
79
        matched_objects = []
80
        for data_host in self.data_hosts:
81
            if not server_only:
82
                for data_worker in data_host.data_workers:
83
                    if data_worker.deploy_type == DeployType.NATIVE and docker_only:
84
                        continue
85
                    if data_worker.deploy_type == DeployType.DOCKER and native_only:
86
                        continue
87
                    matched_objects.append(data_worker)
88
            if not worker_only:
89
                for data_server in data_host.data_servers:
90
                    if data_server.deploy_type == DeployType.NATIVE and docker_only:
91
                        continue
92
                    if data_server.deploy_type == DeployType.DOCKER and native_only:
93
                        continue
94
                    matched_objects.append(data_server)
95
        if str_names_only:
96
            # gets only the processor names of the matched objects
97
            name_list = [match.processor_name for match in matched_objects]
98
            if unique_only:
99
                # removes the duplicates, if any
100
                return list(dict.fromkeys(name_list))
101
            return name_list
102
        return matched_objects
103
104
    def resolve_processor_server_url(self, processor_name) -> str:
105
        processor_server_url = ''
106
        for data_host in self.data_hosts:
107
            for data_server in data_host.data_servers:
108
                if data_server.processor_name == processor_name:
109
                    processor_server_url = f'http://{data_host.address}:{data_server.port}/'
110
        return processor_server_url
111
112
    def kill_all(self) -> None:
113
        """ kill all started services: hosts, database, queue
114
115
        The order of killing is important to optimize graceful shutdown in the future. If RabbitMQ
116
        server is killed before killing Processing Workers, that may have bad outcome and leave
117
        Processing Workers in an unpredictable state
118
        """
119
        self.kill_hosts()
120
        self.kill_mongodb()
121
        self.kill_rabbitmq()
122
123
    def deploy_hosts(
124
            self,
125
            mongodb_url: str,
126
            rabbitmq_url: str
127
    ) -> None:
128
        for host_data in self.data_hosts:
129
            if host_data.needs_ssh:
130
                host_data.create_client(client_type='ssh')
131
                assert host_data.ssh_client
132
            if host_data.needs_docker:
133
                host_data.create_client(client_type='docker')
134
                assert host_data.docker_client
135
136
            self.log.debug(f'Deploying processing workers on host: {host_data.address}')
137
            for data_worker in host_data.data_workers:
138
                self._deploy_processing_worker(
139
                    mongodb_url,
140
                    rabbitmq_url,
141
                    host_data,
142
                    data_worker
143
                )
144
145
            self.log.debug(f'Deploying processor servers on host: {host_data.address}')
146
            for data_server in host_data.data_servers:
147
                self._deploy_processor_server(
148
                    mongodb_url,
149
                    host_data,
150
                    data_server
151
                )
152
153
            if host_data.ssh_client:
154
                host_data.ssh_client.close()
155
                host_data.ssh_client = None
156
            if host_data.docker_client:
157
                host_data.docker_client.close()
158
                host_data.docker_client = None
159
160
    def _deploy_processing_worker(
161
            self,
162
            mongodb_url: str,
163
            rabbitmq_url: str,
164
            host_data: DataHost,
165
            data_worker: DataProcessingWorker
166
    ) -> None:
167
        self.log.debug(f"Deploying processing worker, "
168
                       f"environment: '{data_worker.deploy_type}', "
169
                       f"name: '{data_worker.processor_name}', "
170
                       f"address: '{host_data.address}'")
171
172
        if data_worker.deploy_type == DeployType.NATIVE:
173
            assert host_data.ssh_client  # to satisfy mypy
174
            pid = self.start_native_processor(
175
                ssh_client=host_data.ssh_client,
176
                processor_name=data_worker.processor_name,
177
                queue_url=rabbitmq_url,
178
                database_url=mongodb_url,
179
            )
180
            data_worker.pid = pid
181
        elif data_worker.deploy_type == DeployType.DOCKER:
182
            assert host_data.docker_client  # to satisfy mypy
183
            pid = self.start_docker_processor(
184
                docker_client=host_data.docker_client,
185
                processor_name=data_worker.processor_name,
186
                _queue_url=rabbitmq_url,
187
                _database_url=mongodb_url
188
            )
189
            data_worker.pid = pid
190
        sleep(0.2)
191
192
    # TODO: Revisit this to remove code duplications of deploy_* methods
193
    def _deploy_processor_server(
194
            self,
195
            mongodb_url: str,
196
            host_data: DataHost,
197
            data_server: DataProcessorServer,
198
    ) -> None:
199
        self.log.debug(f"Deploying processing worker, "
200
                       f"environment: '{data_server.deploy_type}', "
201
                       f"name: '{data_server.processor_name}', "
202
                       f"address: '{data_server.host}:{data_server.port}'")
203
204
        if data_server.deploy_type == DeployType.NATIVE:
205
            assert host_data.ssh_client
206
            pid = self.start_native_processor_server(
207
                ssh_client=host_data.ssh_client,
208
                processor_name=data_server.processor_name,
209
                agent_address=f'{data_server.host}:{data_server.port}',
210
                database_url=mongodb_url,
211
            )
212
            data_server.pid = pid
213
214
            if data_server.processor_name in host_data.server_ports:
215
                name = data_server.processor_name
216
                port = data_server.port
217
                if host_data.server_ports[name]:
218
                    host_data.server_ports[name] = host_data.server_ports[name].append(port)
219
                else:
220
                    host_data.server_ports[name] = [port]
221
            else:
222
                host_data.server_ports[data_server.processor_name] = [data_server.port]
223
        elif data_server.deploy_type == DeployType.DOCKER:
224
            raise Exception("Deploying docker processor server is not supported yet!")
225
226
    def deploy_rabbitmq(
227
            self,
228
            image: str,
229
            detach: bool,
230
            remove: bool,
231
            ports_mapping: Union[Dict, None] = None
232
    ) -> str:
233
        if self.data_queue.skip_deployment:
234
            self.log.debug(f"RabbitMQ is externaly managed. Skipping deployment")
235
            verify_rabbitmq_available(
236
                self.data_queue.address,
237
                self.data_queue.port,
238
                self.data_queue.vhost,
239
                self.data_queue.username,
240
                self.data_queue.password
241
            )
242
            return self.data_queue.url
243
        self.log.debug(f"Trying to deploy '{image}', with modes: "
244
                       f"detach='{detach}', remove='{remove}'")
245
246
        if not self.data_queue or not self.data_queue.address:
247
            raise ValueError('Deploying RabbitMQ has failed - missing configuration.')
248
249
        client = create_docker_client(
250
            self.data_queue.address,
251
            self.data_queue.ssh_username,
252
            self.data_queue.ssh_password,
253
            self.data_queue.ssh_keypath
254
        )
255
        if not ports_mapping:
256
            # 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS
257
            # 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS
258
            # 25672: used for internode and CLI tools communication and is allocated from
259
            # a dynamic range (limited to a single port by default, computed as AMQP port + 20000)
260
            ports_mapping = {
261
                5672: self.data_queue.port,
262
                15672: 15672,
263
                25672: 25672
264
            }
265
        res = client.containers.run(
266
            image=image,
267
            detach=detach,
268
            remove=remove,
269
            ports=ports_mapping,
270
            # The default credentials to be used by the processing workers
271
            environment=[
272
                f'RABBITMQ_DEFAULT_USER={self.data_queue.username}',
273
                f'RABBITMQ_DEFAULT_PASS={self.data_queue.password}'
274
            ]
275
        )
276
        assert res and res.id, \
277
            f'Failed to start RabbitMQ docker container on host: {self.data_queue.address}'
278
        self.data_queue.pid = res.id
279
        client.close()
280
281
        rmq_host = self.data_queue.address
282
        rmq_port = int(self.data_queue.port)
283
        rmq_vhost = '/'
284
285
        verify_rabbitmq_available(
286
            host=rmq_host,
287
            port=rmq_port,
288
            vhost=rmq_vhost,
289
            username=self.data_queue.username,
290
            password=self.data_queue.password
291
        )
292
        self.log.info(f'The RabbitMQ server was deployed on URL: '
293
                      f'{rmq_host}:{rmq_port}{rmq_vhost}')
294
        return self.data_queue.url
295
296
    def deploy_mongodb(
297
            self,
298
            image: str,
299
            detach: bool,
300
            remove: bool,
301
            ports_mapping: Union[Dict, None] = None
302
    ) -> str:
303
        if self.data_mongo.skip_deployment:
304
            self.log.debug('MongoDB is externaly managed. Skipping deployment')
305
            verify_mongodb_available(self.data_mongo.url);
306
            return self.data_mongo.url
307
308
        self.log.debug(f"Trying to deploy '{image}', with modes: "
309
                       f"detach='{detach}', remove='{remove}'")
310
311
        if not self.data_mongo or not self.data_mongo.address:
312
            raise ValueError('Deploying MongoDB has failed - missing configuration.')
313
314
        client = create_docker_client(
315
            self.data_mongo.address,
316
            self.data_mongo.ssh_username,
317
            self.data_mongo.ssh_password,
318
            self.data_mongo.ssh_keypath
319
        )
320
        if not ports_mapping:
321
            ports_mapping = {
322
                27017: self.data_mongo.port
323
            }
324
        if self.data_mongo.username:
325
            environment = [
326
                f'MONGO_INITDB_ROOT_USERNAME={self.data_mongo.username}',
327
                f'MONGO_INITDB_ROOT_PASSWORD={self.data_mongo.password}'
328
            ]
329
        else:
330
            environment = []
331
332
        res = client.containers.run(
333
            image=image,
334
            detach=detach,
335
            remove=remove,
336
            ports=ports_mapping,
337
            environment=environment
338
        )
339
        if not res or not res.id:
340
            raise RuntimeError('Failed to start MongoDB docker container on host: '
341
                               f'{self.data_mongo.address}')
342
        self.data_mongo.pid = res.id
343
        client.close()
344
345
        mongodb_hostinfo = f'{self.data_mongo.address}:{self.data_mongo.port}'
346
        self.log.info(f'The MongoDB was deployed on host: {mongodb_hostinfo}')
347
        return self.data_mongo.url
348
349
    def kill_rabbitmq(self) -> None:
350
        if self.data_queue.skip_deployment:
351
            return
352
        elif not self.data_queue.pid:
353
            self.log.warning('No running RabbitMQ instance found')
354
            return
355
        client = create_docker_client(
356
            self.data_queue.address,
357
            self.data_queue.ssh_username,
358
            self.data_queue.ssh_password,
359
            self.data_queue.ssh_keypath
360
        )
361
        client.containers.get(self.data_queue.pid).stop()
362
        self.data_queue.pid = None
363
        client.close()
364
        self.log.info('The RabbitMQ is stopped')
365
366
    def kill_mongodb(self) -> None:
367
        if self.data_mongo.skip_deployment:
368
            return
369
        elif not self.data_mongo.pid:
370
            self.log.warning('No running MongoDB instance found')
371
            return
372
        client = create_docker_client(
373
            self.data_mongo.address,
374
            self.data_mongo.ssh_username,
375
            self.data_mongo.ssh_password,
376
            self.data_mongo.ssh_keypath
377
        )
378
        client.containers.get(self.data_mongo.pid).stop()
379
        self.data_mongo.pid = None
380
        client.close()
381
        self.log.info('The MongoDB is stopped')
382
383
    def kill_hosts(self) -> None:
384
        self.log.debug('Starting to kill/stop hosts')
385
        # Kill processing hosts
386
        for host_data in self.data_hosts:
387
            if host_data.needs_ssh:
388
                host_data.create_client(client_type='ssh')
389
                assert host_data.ssh_client
390
            if host_data.needs_docker:
391
                host_data.create_client(client_type='docker')
392
                assert host_data.docker_client
393
394
            self.log.debug(f'Killing/Stopping processing workers on host: {host_data.address}')
395
            self.kill_processing_workers(host_data)
396
397
            self.log.debug(f'Killing/Stopping processor servers on host: {host_data.address}')
398
            self.kill_processor_servers(host_data)
399
400
            if host_data.ssh_client:
401
                host_data.ssh_client.close()
402
                host_data.ssh_client = None
403
            if host_data.docker_client:
404
                host_data.docker_client.close()
405
                host_data.docker_client = None
406
407
    # TODO: Optimize the code duplication from start_* and kill_* methods
408
    def kill_processing_workers(self, host_data: DataHost) -> None:
409
        amount = len(host_data.data_workers)
410
        if not amount:
411
            self.log.info(f'No active processing workers to be stopped.')
412
            return
413
        self.log.info(f"Trying to stop {amount} processing workers:")
414
        for worker in host_data.data_workers:
415
            if not worker.pid:
416
                continue
417
            if worker.deploy_type == DeployType.NATIVE:
418
                host_data.ssh_client.exec_command(f'kill {worker.pid}')
419
                self.log.info(f"Stopped native worker with pid: '{worker.pid}'")
420
            elif worker.deploy_type == DeployType.DOCKER:
421
                host_data.docker_client.containers.get(worker.pid).stop()
422
                self.log.info(f"Stopped docker worker with container id: '{worker.pid}'")
423
        host_data.data_workers = []
424
425
    def kill_processor_servers(self, host_data: DataHost) -> None:
426
        amount = len(host_data.data_servers)
427
        if not amount:
428
            self.log.info(f'No active processor servers to be stopped.')
429
            return
430
        self.log.info(f"Trying to stop {amount} processing workers:")
431
        for server in host_data.data_servers:
432
            if not server.pid:
433
                continue
434
            if server.deploy_type == DeployType.NATIVE:
435
                host_data.ssh_client.exec_command(f'kill {server.pid}')
436
                self.log.info(f"Stopped native server with pid: '{server.pid}'")
437
            elif server.deploy_type == DeployType.DOCKER:
438
                host_data.docker_client.containers.get(server.pid).stop()
439
                self.log.info(f"Stopped docker server with container id: '{server.pid}'")
440
        host_data.data_servers = []
441
442
    def start_native_processor(
443
            self,
444
            ssh_client,
445
            processor_name: str,
446
            queue_url: str,
447
            database_url: str
448
    ) -> str:
449
        """ start a processor natively on a host via ssh
450
451
        Args:
452
            ssh_client:         paramiko SSHClient to execute commands on a host
453
            processor_name:     name of processor to run
454
            queue_url:          url to rabbitmq
455
            database_url:       url to database
456
457
        Returns:
458
            str: pid of running process
459
        """
460
        self.log.info(f'Starting native processing worker: {processor_name}')
461
        channel = ssh_client.invoke_shell()
462
        stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
463
        cmd = f'{processor_name} --type worker --database {database_url} --queue {queue_url}'
464
        # the only way (I could find) to make it work to start a process in the background and
465
        # return early is this construction. The pid of the last started background process is
466
        # printed with `echo $!` but it is printed inbetween other output. Because of that I added
467
        # `xyz` before and after the code to easily be able to filter out the pid via regex when
468
        # returning from the function
469
        log_path = '/tmp/ocrd-processing-server-startup.log'
470
        stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n")
471
        stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
472
        stdin.write('echo xyz$!xyz \n exit \n')
473
        output = stdout.read().decode('utf-8')
474
        stdout.close()
475
        stdin.close()
476
        return re_search(r'xyz([0-9]+)xyz', output).group(1)  # type: ignore
477
478
    def start_docker_processor(
479
            self,
480
            docker_client,
481
            processor_name: str,
482
            _queue_url: str,
483
            _database_url: str
484
    ) -> str:
485
        # TODO: Raise an exception here as well?
486
        #  raise Exception("Deploying docker processing worker is not supported yet!")
487
488
        self.log.info(f'Starting docker container processor: {processor_name}')
489
        # TODO: add real command here to start processing server in docker here
490
        res = docker_client.containers.run('debian', 'sleep 500s', detach=True, remove=True)
491
        assert res and res.id, f'Running processor: {processor_name} in docker-container failed'
492
        return res.id
493
494
    # TODO: Just a copy of the above start_native_processor() method.
495
    #  Far from being great... But should be good as a starting point
496
    def start_native_processor_server(
497
            self,
498
            ssh_client,
499
            processor_name: str,
500
            agent_address: str,
501
            database_url: str
502
    ) -> str:
503
        self.log.info(f"Starting native processor server: {processor_name} on {agent_address}")
504
        channel = ssh_client.invoke_shell()
505
        stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
506
        cmd = f'{processor_name} --type server --address {agent_address} --database {database_url}'
507
        port = agent_address.split(':')[1]
508
        log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log'
509
        # TODO: This entire stdin/stdout thing is broken with servers!
510
        stdin.write(f"echo starting processor server with '{cmd}' >> '{log_path}'\n")
511
        stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
512
        stdin.write('echo xyz$!xyz \n exit \n')
513
        output = stdout.read().decode('utf-8')
514
        stdout.close()
515
        stdin.close()
516
        return re_search(r'xyz([0-9]+)xyz', output).group(1)  # type: ignore
517