Passed
Pull Request — master (#974)
by Konstantin
02:57
created

ocrd_network.deployer.Deployer.deploy_hosts()   C

Complexity

Conditions 9

Size

Total Lines 28
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 28
rs 6.6666
c 0
b 0
f 0
cc 9
nop 3
1
"""
2
Abstraction of the deployment functionality for processors.
3
4
The Processing Server provides the configuration parameters to the Deployer agent.
5
The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts.
6
Each Processing Host may have several Processing Workers.
7
Each Processing Worker is an instance of an OCR-D processor.
8
"""
9
10
from __future__ import annotations
11
from typing import Dict, Union
12
from paramiko import SSHClient
13
from re import search as re_search
14
from time import sleep
15
16
17
from ocrd_utils import getLogger
18
from .deployment_config import *
19
from .deployment_utils import (
20
    create_docker_client,
21
    create_ssh_client,
22
    CustomDockerClient,
23
    DeployType,
24
    HostData,
25
)
26
from .rabbitmq_utils import RMQPublisher
27
28
29
class Deployer:
30
    """Wraps the deployment functionality of the Processing Server
31
32
    Deployer is the one acting.
33
    :py:attr:`config` is for representation of the config file only.
34
    :py:attr:`hosts` is for managing processor information, not for actually processing.
35
    """
36
37
    def __init__(self, config: ProcessingServerConfig) -> None:
38
        """
39
        Args:
40
            config (:py:class:`ProcessingServerConfig`): parsed configuration of the Processing Server
41
        """
42
        self.log = getLogger(__name__)
43
        self.config = config
44
        self.hosts = HostData.from_config(config.hosts)
45
        self.mongo_pid = None
46
        self.mq_pid = None
47
48
    def kill_all(self) -> None:
49
        """ kill all started services: workers, database, queue
50
51
        The order of killing is important to optimize graceful shutdown in the future. If RabbitMQ
52
        server is killed before killing Processing Workers, that may have bad outcome and leave
53
        Processing Workers in an unpredictable state
54
        """
55
        self.kill_hosts()
56
        self.kill_mongodb()
57
        self.kill_rabbitmq()
58
59
    def deploy_hosts(self, rabbitmq_url: str, mongodb_url: str) -> None:
60
        for host in self.hosts:
61
            self.log.debug(f'Deploying processing workers on host: {host.config.address}')
62
63
            if (any(p.deploy_type == DeployType.native for p in host.config.processors)
64
                    and not host.ssh_client):
65
                host.ssh_client = create_ssh_client(
66
                    host.config.address,
67
                    host.config.username,
68
                    host.config.password,
69
                    host.config.keypath
70
                )
71
            if (any(p.deploy_type == DeployType.docker for p in host.config.processors)
72
                    and not host.docker_client):
73
                host.docker_client = create_docker_client(
74
                    host.config.address,
75
                    host.config.username,
76
                    host.config.password,
77
                    host.config.keypath
78
                )
79
80
            for processor in host.config.processors:
81
                self._deploy_processing_worker(processor, host, rabbitmq_url, mongodb_url)
82
83
            if host.ssh_client:
84
                host.ssh_client.close()
85
            if host.docker_client:
86
                host.docker_client.close()
87
88
    def _deploy_processing_worker(self, processor: WorkerConfig, host: HostData,
89
                                  rabbitmq_url: str, mongodb_url: str) -> None:
90
91
        self.log.debug(f"deploy '{processor.deploy_type}' processor: '{processor}' on '{host.config.address}'")
92
93
        for _ in range(processor.count):
94
            if processor.deploy_type == DeployType.native:
95
                assert host.ssh_client  # to satisfy mypy
96
                pid = self.start_native_processor(
97
                    client=host.ssh_client,
98
                    processor_name=processor.name,
99
                    queue_url=rabbitmq_url,
100
                    database_url=mongodb_url,
101
                )
102
                host.pids_native.append(pid)
103
            else:
104
                assert processor.deploy_type == DeployType.docker
105
                assert host.docker_client  # to satisfy mypy
106
                pid = self.start_docker_processor(
107
                    client=host.docker_client,
108
                    processor_name=processor.name,
109
                    queue_url=rabbitmq_url,
110
                    database_url=mongodb_url
111
                )
112
                host.pids_docker.append(pid)
113
            sleep(0.1)
114
115
    def deploy_rabbitmq(self, image: str, detach: bool, remove: bool,
116
                        ports_mapping: Union[Dict, None] = None) -> str:
117
        """Start docker-container with rabbitmq
118
119
        This method deploys the RabbitMQ Server. Handling of creation of queues, submitting messages
120
        to queues, and receiving messages from queues is part of the RabbitMQ Library which is part
121
        of the OCR-D WebAPI implementation.
122
        """
123
        self.log.debug(f"Trying to deploy '{image}', with modes: "
124
                       f"detach='{detach}', remove='{remove}'")
125
126
        if not self.config or not self.config.queue.address:
127
            raise ValueError('Deploying RabbitMQ has failed - missing configuration.')
128
129
        client = create_docker_client(self.config.queue.address, self.config.queue.username,
130
                                      self.config.queue.password, self.config.queue.keypath)
131
        if not ports_mapping:
132
            # 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS
133
            # 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS
134
            # 25672: used for internode and CLI tools communication and is allocated from
135
            # a dynamic range (limited to a single port by default, computed as AMQP port + 20000)
136
            ports_mapping = {
137
                5672: self.config.queue.port,
138
                15672: 15672,
139
                25672: 25672
140
            }
141
        res = client.containers.run(
142
            image=image,
143
            detach=detach,
144
            remove=remove,
145
            ports=ports_mapping,
146
            # The default credentials to be used by the processing workers
147
            environment=[
148
                f'RABBITMQ_DEFAULT_USER={self.config.queue.credentials[0]}',
149
                f'RABBITMQ_DEFAULT_PASS={self.config.queue.credentials[1]}'
150
            ]
151
        )
152
        assert res and res.id, \
153
            f'Failed to start RabbitMQ docker container on host: {self.config.mongo.address}'
154
        self.mq_pid = res.id
155
        client.close()
156
157
        # Build the RabbitMQ Server URL to return
158
        rmq_host = self.config.queue.address
159
        # note, integer validation is already performed
160
        rmq_port = int(self.config.queue.port)
161
        # the default virtual host since no field is
162
        # provided in the processing server config.yml
163
        rmq_vhost = '/'
164
165
        self.wait_for_rabbitmq_availability(rmq_host, rmq_port, rmq_vhost,
166
                                            self.config.queue.credentials[0],
167
                                            self.config.queue.credentials[1])
168
169
        rabbitmq_hostinfo = f'{rmq_host}:{rmq_port}{rmq_vhost}'
170
        self.log.info(f'The RabbitMQ server was deployed on host: {rabbitmq_hostinfo}')
171
        return rabbitmq_hostinfo
172
173
    def wait_for_rabbitmq_availability(self, host: str, port: int, vhost: str, username: str,
174
                                       password: str) -> None:
175
        max_waiting_steps = 15
176
        while max_waiting_steps > 0:
177
            try:
178
                dummy_publisher = RMQPublisher(host=host, port=port, vhost=vhost)
179
                dummy_publisher.authenticate_and_connect(username=username, password=password)
180
            except Exception:
181
                max_waiting_steps -= 1
182
                sleep(2)
183
            else:
184
                # TODO: Disconnect the dummy_publisher here before returning...
185
                return
186
        raise RuntimeError('Error waiting for queue startup: timeout exceeded')
187
188
    def deploy_mongodb(self, image: str, detach: bool, remove: bool,
189
                       ports_mapping: Union[Dict, None] = None) -> str:
190
        """ Start mongodb in docker
191
        """
192
        self.log.debug(f"Trying to deploy '{image}', with modes: "
193
                       f"detach='{detach}', remove='{remove}'")
194
195
        if not self.config or not self.config.mongo.address:
196
            raise ValueError('Deploying MongoDB has failed - missing configuration.')
197
198
        client = create_docker_client(self.config.mongo.address, self.config.mongo.username,
199
                                      self.config.mongo.password, self.config.mongo.keypath)
200
        if not ports_mapping:
201
            ports_mapping = {
202
                27017: self.config.mongo.port
203
            }
204
        res = client.containers.run(
205
            image=image,
206
            detach=detach,
207
            remove=remove,
208
            ports=ports_mapping
209
        )
210
        if not res or not res.id:
211
            raise RuntimeError('Failed to start MongoDB docker container on host: '
212
                               f'{self.config.mongo.address}')
213
        self.mongo_pid = res.id
214
        client.close()
215
216
        mongodb_hostinfo = f'{self.config.mongo.address}:{self.config.mongo.port}'
217
        self.log.info(f'The MongoDB was deployed on host: {mongodb_hostinfo}')
218
        return mongodb_hostinfo
219
220
    def kill_rabbitmq(self) -> None:
221
        if not self.mq_pid:
222
            self.log.warning('No running RabbitMQ instance found')
223
            return
224
        client = create_docker_client(self.config.queue.address, self.config.queue.username,
225
                                      self.config.queue.password, self.config.queue.keypath)
226
        client.containers.get(self.mq_pid).stop()
227
        self.mq_pid = None
228
        client.close()
229
        self.log.info('The RabbitMQ is stopped')
230
231
    def kill_mongodb(self) -> None:
232
        if not self.mongo_pid:
233
            self.log.warning('No running MongoDB instance found')
234
            return
235
        client = create_docker_client(self.config.mongo.address, self.config.mongo.username,
236
                                      self.config.mongo.password, self.config.mongo.keypath)
237
        client.containers.get(self.mongo_pid).stop()
238
        self.mongo_pid = None
239
        client.close()
240
        self.log.info('The MongoDB is stopped')
241
242
    def kill_hosts(self) -> None:
243
        self.log.debug('Starting to kill/stop hosts')
244
        # Kill processing hosts
245
        for host in self.hosts:
246
            self.log.debug(f'Killing/Stopping processing workers on host: {host.config.address}')
247
            if host.ssh_client:
248
                host.ssh_client = create_ssh_client(host.config.address, host.config.username,
249
                                                    host.config.password, host.config.keypath)
250
            if host.docker_client:
251
                host.docker_client = create_docker_client(host.config.address, host.config.username,
252
                                                          host.config.password, host.config.keypath)
253
            # Kill deployed OCR-D processor instances on this Processing worker host
254
            self.kill_processing_worker(host)
255
256
    def kill_processing_worker(self, host: HostData) -> None:
257
        for pid in host.pids_native:
258
            self.log.debug(f"Trying to kill/stop native processor: with PID: '{pid}'")
259
            host.ssh_client.exec_command(f'kill {pid}')
260
        host.pids_native = []
261
262
        for pid in host.pids_docker:
263
            self.log.debug(f"Trying to kill/stop docker container with PID: '{pid}'")
264
            host.docker_client.containers.get(pid).stop()
265
        host.pids_docker = []
266
267
    def start_native_processor(self, client: SSHClient, processor_name: str, queue_url: str,
268
                               database_url: str) -> str:
269
        """ start a processor natively on a host via ssh
270
271
        Args:
272
            client:             paramiko SSHClient to execute commands on a host
273
            processor_name:     name of processor to run
274
            queue_url:          url to rabbitmq
275
            database_url:       url to database
276
277
        Returns:
278
            str: pid of running process
279
        """
280
        self.log.info(f'Starting native processor: {processor_name}')
281
        channel = client.invoke_shell()
282
        stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
283
        cmd = f'{processor_name} --database {database_url} --queue {queue_url}'
284
        # the only way (I could find) to make it work to start a process in the background and
285
        # return early is this construction. The pid of the last started background process is
286
        # printed with `echo $!` but it is printed inbetween other output. Because of that I added
287
        # `xyz` before and after the code to easily be able to filter out the pid via regex when
288
        # returning from the function
289
        logpath = '/tmp/ocrd-processing-server-startup.log'
290
        stdin.write(f"echo starting processor with '{cmd}' >> '{logpath}'\n")
291
        stdin.write(f'{cmd} >> {logpath} 2>&1 &\n')
292
        stdin.write('echo xyz$!xyz \n exit \n')
293
        output = stdout.read().decode('utf-8')
294
        stdout.close()
295
        stdin.close()
296
        return re_search(r'xyz([0-9]+)xyz', output).group(1)  # type: ignore
297
298
    def start_docker_processor(self, client: CustomDockerClient, processor_name: str,
299
                               queue_url: str, database_url: str) -> str:
300
        self.log.info(f'Starting docker container processor: {processor_name}')
301
        # TODO: add real command here to start processing server in docker here
302
        res = client.containers.run('debian', 'sleep 500s', detach=True, remove=True)
303
        assert res and res.id, f'Running processor: {processor_name} in docker-container failed'
304
        return res.id
305