|
1
|
|
|
""" |
|
2
|
|
|
Abstraction of the deployment functionality for processors. |
|
3
|
|
|
|
|
4
|
|
|
The Processing Server provides the configuration parameters to the Deployer agent. |
|
5
|
|
|
The Deployer agent runs the RabbitMQ Server, MongoDB and the Processing Hosts. |
|
6
|
|
|
Each Processing Host may have several Processing Workers. |
|
7
|
|
|
Each Processing Worker is an instance of an OCR-D processor. |
|
8
|
|
|
""" |
|
9
|
|
|
from __future__ import annotations |
|
10
|
|
|
from typing import Dict, List, Union |
|
11
|
|
|
from re import search as re_search |
|
12
|
|
|
from os import getpid |
|
13
|
|
|
from time import sleep |
|
14
|
|
|
|
|
15
|
|
|
from ocrd_utils import getLogger |
|
16
|
|
|
|
|
17
|
|
|
from .deployment_utils import ( |
|
18
|
|
|
create_docker_client, |
|
19
|
|
|
DeployType, |
|
20
|
|
|
verify_mongodb_available, |
|
21
|
|
|
verify_rabbitmq_available, |
|
22
|
|
|
) |
|
23
|
|
|
|
|
24
|
|
|
from .runtime_data import ( |
|
25
|
|
|
DataHost, |
|
26
|
|
|
DataMongoDB, |
|
27
|
|
|
DataProcessingWorker, |
|
28
|
|
|
DataProcessorServer, |
|
29
|
|
|
DataRabbitMQ |
|
30
|
|
|
) |
|
31
|
|
|
from .utils import validate_and_load_config |
|
32
|
|
|
|
|
33
|
|
|
|
|
34
|
|
|
class Deployer: |
|
35
|
|
|
def __init__(self, config_path: str) -> None: |
|
36
|
|
|
self.log = getLogger(__name__) |
|
37
|
|
|
config = validate_and_load_config(config_path) |
|
38
|
|
|
|
|
39
|
|
|
self.data_mongo: DataMongoDB = DataMongoDB(config['database']) |
|
40
|
|
|
self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue']) |
|
41
|
|
|
self.data_hosts: List[DataHost] = [] |
|
42
|
|
|
for config_host in config['hosts']: |
|
43
|
|
|
self.data_hosts.append(DataHost(config_host)) |
|
44
|
|
|
|
|
45
|
|
|
# TODO: Reconsider this. |
|
46
|
|
|
def find_matching_processors( |
|
47
|
|
|
self, |
|
48
|
|
|
worker_only: bool = False, |
|
49
|
|
|
server_only: bool = False, |
|
50
|
|
|
docker_only: bool = False, |
|
51
|
|
|
native_only: bool = False, |
|
52
|
|
|
str_names_only: bool = False, |
|
53
|
|
|
unique_only: bool = False |
|
54
|
|
|
) -> Union[List[str], List[object]]: |
|
55
|
|
|
"""Finds and returns a list of matching data objects of type: |
|
56
|
|
|
`DataProcessingWorker` and `DataProcessorServer`. |
|
57
|
|
|
|
|
58
|
|
|
:py:attr:`worker_only` match only processors with worker status |
|
59
|
|
|
:py:attr:`server_only` match only processors with server status |
|
60
|
|
|
:py:attr:`docker_only` match only docker processors |
|
61
|
|
|
:py:attr:`native_only` match only native processors |
|
62
|
|
|
:py:attr:`str_only` returns the processor_name instead of data object |
|
63
|
|
|
:py:attr:`unique_only` remove duplicates from the matches |
|
64
|
|
|
|
|
65
|
|
|
`worker_only` and `server_only` are mutually exclusive to each other |
|
66
|
|
|
`docker_only` and `native_only` are mutually exclusive to each other |
|
67
|
|
|
`unique_only` is allowed only together with `str_names_only` |
|
68
|
|
|
""" |
|
69
|
|
|
|
|
70
|
|
|
if worker_only and server_only: |
|
71
|
|
|
raise ValueError(f"Only 'worker_only' or 'server_only' is allowed, not both.") |
|
72
|
|
|
if docker_only and native_only: |
|
73
|
|
|
raise ValueError(f"Only 'docker_only' or 'native_only' is allowed, not both.") |
|
74
|
|
|
if not str_names_only and unique_only: |
|
75
|
|
|
raise ValueError(f"Value 'unique_only' is allowed only together with 'str_names_only'") |
|
76
|
|
|
|
|
77
|
|
|
# Find all matching objects of type: |
|
78
|
|
|
# DataProcessingWorker or DataProcessorServer |
|
79
|
|
|
matched_objects = [] |
|
80
|
|
|
for data_host in self.data_hosts: |
|
81
|
|
|
if not server_only: |
|
82
|
|
|
for data_worker in data_host.data_workers: |
|
83
|
|
|
if data_worker.deploy_type == DeployType.NATIVE and docker_only: |
|
84
|
|
|
continue |
|
85
|
|
|
if data_worker.deploy_type == DeployType.DOCKER and native_only: |
|
86
|
|
|
continue |
|
87
|
|
|
matched_objects.append(data_worker) |
|
88
|
|
|
if not worker_only: |
|
89
|
|
|
for data_server in data_host.data_servers: |
|
90
|
|
|
if data_server.deploy_type == DeployType.NATIVE and docker_only: |
|
91
|
|
|
continue |
|
92
|
|
|
if data_server.deploy_type == DeployType.DOCKER and native_only: |
|
93
|
|
|
continue |
|
94
|
|
|
matched_objects.append(data_server) |
|
95
|
|
|
if str_names_only: |
|
96
|
|
|
# gets only the processor names of the matched objects |
|
97
|
|
|
name_list = [match.processor_name for match in matched_objects] |
|
98
|
|
|
if unique_only: |
|
99
|
|
|
# removes the duplicates, if any |
|
100
|
|
|
return list(dict.fromkeys(name_list)) |
|
101
|
|
|
return name_list |
|
102
|
|
|
return matched_objects |
|
103
|
|
|
|
|
104
|
|
|
def resolve_processor_server_url(self, processor_name) -> str: |
|
105
|
|
|
processor_server_url = '' |
|
106
|
|
|
for data_host in self.data_hosts: |
|
107
|
|
|
for data_server in data_host.data_servers: |
|
108
|
|
|
if data_server.processor_name == processor_name: |
|
109
|
|
|
processor_server_url = f'http://{data_host.address}:{data_server.port}/' |
|
110
|
|
|
return processor_server_url |
|
111
|
|
|
|
|
112
|
|
|
def kill_all(self) -> None: |
|
113
|
|
|
""" kill all started services: hosts, database, queue |
|
114
|
|
|
|
|
115
|
|
|
The order of killing is important to optimize graceful shutdown in the future. If RabbitMQ |
|
116
|
|
|
server is killed before killing Processing Workers, that may have bad outcome and leave |
|
117
|
|
|
Processing Workers in an unpredictable state |
|
118
|
|
|
""" |
|
119
|
|
|
self.kill_hosts() |
|
120
|
|
|
self.kill_mongodb() |
|
121
|
|
|
self.kill_rabbitmq() |
|
122
|
|
|
|
|
123
|
|
|
def deploy_hosts( |
|
124
|
|
|
self, |
|
125
|
|
|
mongodb_url: str, |
|
126
|
|
|
rabbitmq_url: str |
|
127
|
|
|
) -> None: |
|
128
|
|
|
for host_data in self.data_hosts: |
|
129
|
|
|
if host_data.needs_ssh: |
|
130
|
|
|
host_data.create_client(client_type='ssh') |
|
131
|
|
|
assert host_data.ssh_client |
|
132
|
|
|
if host_data.needs_docker: |
|
133
|
|
|
host_data.create_client(client_type='docker') |
|
134
|
|
|
assert host_data.docker_client |
|
135
|
|
|
|
|
136
|
|
|
self.log.debug(f'Deploying processing workers on host: {host_data.address}') |
|
137
|
|
|
for data_worker in host_data.data_workers: |
|
138
|
|
|
self._deploy_processing_worker( |
|
139
|
|
|
mongodb_url, |
|
140
|
|
|
rabbitmq_url, |
|
141
|
|
|
host_data, |
|
142
|
|
|
data_worker |
|
143
|
|
|
) |
|
144
|
|
|
|
|
145
|
|
|
self.log.debug(f'Deploying processor servers on host: {host_data.address}') |
|
146
|
|
|
for data_server in host_data.data_servers: |
|
147
|
|
|
self._deploy_processor_server( |
|
148
|
|
|
mongodb_url, |
|
149
|
|
|
host_data, |
|
150
|
|
|
data_server |
|
151
|
|
|
) |
|
152
|
|
|
|
|
153
|
|
|
if host_data.ssh_client: |
|
154
|
|
|
host_data.ssh_client.close() |
|
155
|
|
|
host_data.ssh_client = None |
|
156
|
|
|
if host_data.docker_client: |
|
157
|
|
|
host_data.docker_client.close() |
|
158
|
|
|
host_data.docker_client = None |
|
159
|
|
|
|
|
160
|
|
|
def _deploy_processing_worker( |
|
161
|
|
|
self, |
|
162
|
|
|
mongodb_url: str, |
|
163
|
|
|
rabbitmq_url: str, |
|
164
|
|
|
host_data: DataHost, |
|
165
|
|
|
data_worker: DataProcessingWorker |
|
166
|
|
|
) -> None: |
|
167
|
|
|
self.log.debug(f"Deploying processing worker, " |
|
168
|
|
|
f"environment: '{data_worker.deploy_type}', " |
|
169
|
|
|
f"name: '{data_worker.processor_name}', " |
|
170
|
|
|
f"address: '{host_data.address}'") |
|
171
|
|
|
|
|
172
|
|
|
if data_worker.deploy_type == DeployType.NATIVE: |
|
173
|
|
|
assert host_data.ssh_client # to satisfy mypy |
|
174
|
|
|
pid = self.start_native_processor( |
|
175
|
|
|
ssh_client=host_data.ssh_client, |
|
176
|
|
|
processor_name=data_worker.processor_name, |
|
177
|
|
|
queue_url=rabbitmq_url, |
|
178
|
|
|
database_url=mongodb_url, |
|
179
|
|
|
) |
|
180
|
|
|
data_worker.pid = pid |
|
181
|
|
|
elif data_worker.deploy_type == DeployType.DOCKER: |
|
182
|
|
|
assert host_data.docker_client # to satisfy mypy |
|
183
|
|
|
pid = self.start_docker_processor( |
|
184
|
|
|
docker_client=host_data.docker_client, |
|
185
|
|
|
processor_name=data_worker.processor_name, |
|
186
|
|
|
_queue_url=rabbitmq_url, |
|
187
|
|
|
_database_url=mongodb_url |
|
188
|
|
|
) |
|
189
|
|
|
data_worker.pid = pid |
|
190
|
|
|
sleep(0.2) |
|
191
|
|
|
|
|
192
|
|
|
# TODO: Revisit this to remove code duplications of deploy_* methods |
|
193
|
|
|
def _deploy_processor_server( |
|
194
|
|
|
self, |
|
195
|
|
|
mongodb_url: str, |
|
196
|
|
|
host_data: DataHost, |
|
197
|
|
|
data_server: DataProcessorServer, |
|
198
|
|
|
) -> None: |
|
199
|
|
|
self.log.debug(f"Deploying processing worker, " |
|
200
|
|
|
f"environment: '{data_server.deploy_type}', " |
|
201
|
|
|
f"name: '{data_server.processor_name}', " |
|
202
|
|
|
f"address: '{data_server.host}:{data_server.port}'") |
|
203
|
|
|
|
|
204
|
|
|
if data_server.deploy_type == DeployType.NATIVE: |
|
205
|
|
|
assert host_data.ssh_client |
|
206
|
|
|
pid = self.start_native_processor_server( |
|
207
|
|
|
ssh_client=host_data.ssh_client, |
|
208
|
|
|
processor_name=data_server.processor_name, |
|
209
|
|
|
agent_address=f'{data_server.host}:{data_server.port}', |
|
210
|
|
|
database_url=mongodb_url, |
|
211
|
|
|
) |
|
212
|
|
|
data_server.pid = pid |
|
213
|
|
|
|
|
214
|
|
|
if data_server.processor_name in host_data.server_ports: |
|
215
|
|
|
name = data_server.processor_name |
|
216
|
|
|
port = data_server.port |
|
217
|
|
|
if host_data.server_ports[name]: |
|
218
|
|
|
host_data.server_ports[name] = host_data.server_ports[name].append(port) |
|
219
|
|
|
else: |
|
220
|
|
|
host_data.server_ports[name] = [port] |
|
221
|
|
|
else: |
|
222
|
|
|
host_data.server_ports[data_server.processor_name] = [data_server.port] |
|
223
|
|
|
elif data_server.deploy_type == DeployType.DOCKER: |
|
224
|
|
|
raise Exception("Deploying docker processor server is not supported yet!") |
|
225
|
|
|
|
|
226
|
|
|
def deploy_rabbitmq( |
|
227
|
|
|
self, |
|
228
|
|
|
image: str, |
|
229
|
|
|
detach: bool, |
|
230
|
|
|
remove: bool, |
|
231
|
|
|
ports_mapping: Union[Dict, None] = None |
|
232
|
|
|
) -> str: |
|
233
|
|
|
if self.data_queue.skip_deployment: |
|
234
|
|
|
self.log.debug(f"RabbitMQ is externaly managed. Skipping deployment") |
|
235
|
|
|
verify_rabbitmq_available( |
|
236
|
|
|
self.data_queue.address, |
|
237
|
|
|
self.data_queue.port, |
|
238
|
|
|
self.data_queue.vhost, |
|
239
|
|
|
self.data_queue.username, |
|
240
|
|
|
self.data_queue.password |
|
241
|
|
|
) |
|
242
|
|
|
return self.data_queue.url |
|
243
|
|
|
self.log.debug(f"Trying to deploy '{image}', with modes: " |
|
244
|
|
|
f"detach='{detach}', remove='{remove}'") |
|
245
|
|
|
|
|
246
|
|
|
if not self.data_queue or not self.data_queue.address: |
|
247
|
|
|
raise ValueError('Deploying RabbitMQ has failed - missing configuration.') |
|
248
|
|
|
|
|
249
|
|
|
client = create_docker_client( |
|
250
|
|
|
self.data_queue.address, |
|
251
|
|
|
self.data_queue.ssh_username, |
|
252
|
|
|
self.data_queue.ssh_password, |
|
253
|
|
|
self.data_queue.ssh_keypath |
|
254
|
|
|
) |
|
255
|
|
|
if not ports_mapping: |
|
256
|
|
|
# 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS |
|
257
|
|
|
# 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS |
|
258
|
|
|
# 25672: used for internode and CLI tools communication and is allocated from |
|
259
|
|
|
# a dynamic range (limited to a single port by default, computed as AMQP port + 20000) |
|
260
|
|
|
ports_mapping = { |
|
261
|
|
|
5672: self.data_queue.port, |
|
262
|
|
|
15672: 15672, |
|
263
|
|
|
25672: 25672 |
|
264
|
|
|
} |
|
265
|
|
|
res = client.containers.run( |
|
266
|
|
|
image=image, |
|
267
|
|
|
detach=detach, |
|
268
|
|
|
remove=remove, |
|
269
|
|
|
ports=ports_mapping, |
|
270
|
|
|
# The default credentials to be used by the processing workers |
|
271
|
|
|
environment=[ |
|
272
|
|
|
f'RABBITMQ_DEFAULT_USER={self.data_queue.username}', |
|
273
|
|
|
f'RABBITMQ_DEFAULT_PASS={self.data_queue.password}' |
|
274
|
|
|
] |
|
275
|
|
|
) |
|
276
|
|
|
assert res and res.id, \ |
|
277
|
|
|
f'Failed to start RabbitMQ docker container on host: {self.data_queue.address}' |
|
278
|
|
|
self.data_queue.pid = res.id |
|
279
|
|
|
client.close() |
|
280
|
|
|
|
|
281
|
|
|
rmq_host = self.data_queue.address |
|
282
|
|
|
rmq_port = int(self.data_queue.port) |
|
283
|
|
|
rmq_vhost = '/' |
|
284
|
|
|
|
|
285
|
|
|
verify_rabbitmq_available( |
|
286
|
|
|
host=rmq_host, |
|
287
|
|
|
port=rmq_port, |
|
288
|
|
|
vhost=rmq_vhost, |
|
289
|
|
|
username=self.data_queue.username, |
|
290
|
|
|
password=self.data_queue.password |
|
291
|
|
|
) |
|
292
|
|
|
self.log.info(f'The RabbitMQ server was deployed on URL: ' |
|
293
|
|
|
f'{rmq_host}:{rmq_port}{rmq_vhost}') |
|
294
|
|
|
return self.data_queue.url |
|
295
|
|
|
|
|
296
|
|
|
def deploy_mongodb( |
|
297
|
|
|
self, |
|
298
|
|
|
image: str, |
|
299
|
|
|
detach: bool, |
|
300
|
|
|
remove: bool, |
|
301
|
|
|
ports_mapping: Union[Dict, None] = None |
|
302
|
|
|
) -> str: |
|
303
|
|
|
if self.data_mongo.skip_deployment: |
|
304
|
|
|
self.log.debug('MongoDB is externaly managed. Skipping deployment') |
|
305
|
|
|
verify_mongodb_available(self.data_mongo.url); |
|
306
|
|
|
return self.data_mongo.url |
|
307
|
|
|
|
|
308
|
|
|
self.log.debug(f"Trying to deploy '{image}', with modes: " |
|
309
|
|
|
f"detach='{detach}', remove='{remove}'") |
|
310
|
|
|
|
|
311
|
|
|
if not self.data_mongo or not self.data_mongo.address: |
|
312
|
|
|
raise ValueError('Deploying MongoDB has failed - missing configuration.') |
|
313
|
|
|
|
|
314
|
|
|
client = create_docker_client( |
|
315
|
|
|
self.data_mongo.address, |
|
316
|
|
|
self.data_mongo.ssh_username, |
|
317
|
|
|
self.data_mongo.ssh_password, |
|
318
|
|
|
self.data_mongo.ssh_keypath |
|
319
|
|
|
) |
|
320
|
|
|
if not ports_mapping: |
|
321
|
|
|
ports_mapping = { |
|
322
|
|
|
27017: self.data_mongo.port |
|
323
|
|
|
} |
|
324
|
|
|
if self.data_mongo.username: |
|
325
|
|
|
environment = [ |
|
326
|
|
|
f'MONGO_INITDB_ROOT_USERNAME={self.data_mongo.username}', |
|
327
|
|
|
f'MONGO_INITDB_ROOT_PASSWORD={self.data_mongo.password}' |
|
328
|
|
|
] |
|
329
|
|
|
else: |
|
330
|
|
|
environment = [] |
|
331
|
|
|
|
|
332
|
|
|
res = client.containers.run( |
|
333
|
|
|
image=image, |
|
334
|
|
|
detach=detach, |
|
335
|
|
|
remove=remove, |
|
336
|
|
|
ports=ports_mapping, |
|
337
|
|
|
environment=environment |
|
338
|
|
|
) |
|
339
|
|
|
if not res or not res.id: |
|
340
|
|
|
raise RuntimeError('Failed to start MongoDB docker container on host: ' |
|
341
|
|
|
f'{self.data_mongo.address}') |
|
342
|
|
|
self.data_mongo.pid = res.id |
|
343
|
|
|
client.close() |
|
344
|
|
|
|
|
345
|
|
|
mongodb_hostinfo = f'{self.data_mongo.address}:{self.data_mongo.port}' |
|
346
|
|
|
self.log.info(f'The MongoDB was deployed on host: {mongodb_hostinfo}') |
|
347
|
|
|
return self.data_mongo.url |
|
348
|
|
|
|
|
349
|
|
|
def kill_rabbitmq(self) -> None: |
|
350
|
|
|
if self.data_queue.skip_deployment: |
|
351
|
|
|
return |
|
352
|
|
|
elif not self.data_queue.pid: |
|
353
|
|
|
self.log.warning('No running RabbitMQ instance found') |
|
354
|
|
|
return |
|
355
|
|
|
client = create_docker_client( |
|
356
|
|
|
self.data_queue.address, |
|
357
|
|
|
self.data_queue.ssh_username, |
|
358
|
|
|
self.data_queue.ssh_password, |
|
359
|
|
|
self.data_queue.ssh_keypath |
|
360
|
|
|
) |
|
361
|
|
|
client.containers.get(self.data_queue.pid).stop() |
|
362
|
|
|
self.data_queue.pid = None |
|
363
|
|
|
client.close() |
|
364
|
|
|
self.log.info('The RabbitMQ is stopped') |
|
365
|
|
|
|
|
366
|
|
|
def kill_mongodb(self) -> None: |
|
367
|
|
|
if self.data_mongo.skip_deployment: |
|
368
|
|
|
return |
|
369
|
|
|
elif not self.data_mongo.pid: |
|
370
|
|
|
self.log.warning('No running MongoDB instance found') |
|
371
|
|
|
return |
|
372
|
|
|
client = create_docker_client( |
|
373
|
|
|
self.data_mongo.address, |
|
374
|
|
|
self.data_mongo.ssh_username, |
|
375
|
|
|
self.data_mongo.ssh_password, |
|
376
|
|
|
self.data_mongo.ssh_keypath |
|
377
|
|
|
) |
|
378
|
|
|
client.containers.get(self.data_mongo.pid).stop() |
|
379
|
|
|
self.data_mongo.pid = None |
|
380
|
|
|
client.close() |
|
381
|
|
|
self.log.info('The MongoDB is stopped') |
|
382
|
|
|
|
|
383
|
|
|
def kill_hosts(self) -> None: |
|
384
|
|
|
self.log.debug('Starting to kill/stop hosts') |
|
385
|
|
|
# Kill processing hosts |
|
386
|
|
|
for host_data in self.data_hosts: |
|
387
|
|
|
if host_data.needs_ssh: |
|
388
|
|
|
host_data.create_client(client_type='ssh') |
|
389
|
|
|
assert host_data.ssh_client |
|
390
|
|
|
if host_data.needs_docker: |
|
391
|
|
|
host_data.create_client(client_type='docker') |
|
392
|
|
|
assert host_data.docker_client |
|
393
|
|
|
|
|
394
|
|
|
self.log.debug(f'Killing/Stopping processing workers on host: {host_data.address}') |
|
395
|
|
|
self.kill_processing_workers(host_data) |
|
396
|
|
|
|
|
397
|
|
|
self.log.debug(f'Killing/Stopping processor servers on host: {host_data.address}') |
|
398
|
|
|
self.kill_processor_servers(host_data) |
|
399
|
|
|
|
|
400
|
|
|
if host_data.ssh_client: |
|
401
|
|
|
host_data.ssh_client.close() |
|
402
|
|
|
host_data.ssh_client = None |
|
403
|
|
|
if host_data.docker_client: |
|
404
|
|
|
host_data.docker_client.close() |
|
405
|
|
|
host_data.docker_client = None |
|
406
|
|
|
|
|
407
|
|
|
# TODO: Optimize the code duplication from start_* and kill_* methods |
|
408
|
|
|
def kill_processing_workers(self, host_data: DataHost) -> None: |
|
409
|
|
|
amount = len(host_data.data_workers) |
|
410
|
|
|
if not amount: |
|
411
|
|
|
self.log.info(f'No active processing workers to be stopped.') |
|
412
|
|
|
return |
|
413
|
|
|
self.log.info(f"Trying to stop {amount} processing workers:") |
|
414
|
|
|
for worker in host_data.data_workers: |
|
415
|
|
|
if not worker.pid: |
|
416
|
|
|
continue |
|
417
|
|
|
if worker.deploy_type == DeployType.NATIVE: |
|
418
|
|
|
host_data.ssh_client.exec_command(f'kill {worker.pid}') |
|
419
|
|
|
self.log.info(f"Stopped native worker with pid: '{worker.pid}'") |
|
420
|
|
|
elif worker.deploy_type == DeployType.DOCKER: |
|
421
|
|
|
host_data.docker_client.containers.get(worker.pid).stop() |
|
422
|
|
|
self.log.info(f"Stopped docker worker with container id: '{worker.pid}'") |
|
423
|
|
|
host_data.data_workers = [] |
|
424
|
|
|
|
|
425
|
|
|
def kill_processor_servers(self, host_data: DataHost) -> None: |
|
426
|
|
|
amount = len(host_data.data_servers) |
|
427
|
|
|
if not amount: |
|
428
|
|
|
self.log.info(f'No active processor servers to be stopped.') |
|
429
|
|
|
return |
|
430
|
|
|
self.log.info(f"Trying to stop {amount} processing workers:") |
|
431
|
|
|
for server in host_data.data_servers: |
|
432
|
|
|
if not server.pid: |
|
433
|
|
|
continue |
|
434
|
|
|
if server.deploy_type == DeployType.NATIVE: |
|
435
|
|
|
host_data.ssh_client.exec_command(f'kill {server.pid}') |
|
436
|
|
|
self.log.info(f"Stopped native server with pid: '{server.pid}'") |
|
437
|
|
|
elif server.deploy_type == DeployType.DOCKER: |
|
438
|
|
|
host_data.docker_client.containers.get(server.pid).stop() |
|
439
|
|
|
self.log.info(f"Stopped docker server with container id: '{server.pid}'") |
|
440
|
|
|
host_data.data_servers = [] |
|
441
|
|
|
|
|
442
|
|
|
def start_native_processor( |
|
443
|
|
|
self, |
|
444
|
|
|
ssh_client, |
|
445
|
|
|
processor_name: str, |
|
446
|
|
|
queue_url: str, |
|
447
|
|
|
database_url: str |
|
448
|
|
|
) -> str: |
|
449
|
|
|
""" start a processor natively on a host via ssh |
|
450
|
|
|
|
|
451
|
|
|
Args: |
|
452
|
|
|
ssh_client: paramiko SSHClient to execute commands on a host |
|
453
|
|
|
processor_name: name of processor to run |
|
454
|
|
|
queue_url: url to rabbitmq |
|
455
|
|
|
database_url: url to database |
|
456
|
|
|
|
|
457
|
|
|
Returns: |
|
458
|
|
|
str: pid of running process |
|
459
|
|
|
""" |
|
460
|
|
|
self.log.info(f'Starting native processing worker: {processor_name}') |
|
461
|
|
|
channel = ssh_client.invoke_shell() |
|
462
|
|
|
stdin, stdout = channel.makefile('wb'), channel.makefile('rb') |
|
463
|
|
|
cmd = f'{processor_name} --type worker --database {database_url} --queue {queue_url}' |
|
464
|
|
|
# the only way (I could find) to make it work to start a process in the background and |
|
465
|
|
|
# return early is this construction. The pid of the last started background process is |
|
466
|
|
|
# printed with `echo $!` but it is printed inbetween other output. Because of that I added |
|
467
|
|
|
# `xyz` before and after the code to easily be able to filter out the pid via regex when |
|
468
|
|
|
# returning from the function |
|
469
|
|
|
log_path = '/tmp/ocrd-processing-server-startup.log' |
|
470
|
|
|
stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n") |
|
471
|
|
|
stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') |
|
472
|
|
|
stdin.write('echo xyz$!xyz \n exit \n') |
|
473
|
|
|
output = stdout.read().decode('utf-8') |
|
474
|
|
|
stdout.close() |
|
475
|
|
|
stdin.close() |
|
476
|
|
|
return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore |
|
477
|
|
|
|
|
478
|
|
|
def start_docker_processor( |
|
479
|
|
|
self, |
|
480
|
|
|
docker_client, |
|
481
|
|
|
processor_name: str, |
|
482
|
|
|
_queue_url: str, |
|
483
|
|
|
_database_url: str |
|
484
|
|
|
) -> str: |
|
485
|
|
|
# TODO: Raise an exception here as well? |
|
486
|
|
|
# raise Exception("Deploying docker processing worker is not supported yet!") |
|
487
|
|
|
|
|
488
|
|
|
self.log.info(f'Starting docker container processor: {processor_name}') |
|
489
|
|
|
# TODO: add real command here to start processing server in docker here |
|
490
|
|
|
res = docker_client.containers.run('debian', 'sleep 500s', detach=True, remove=True) |
|
491
|
|
|
assert res and res.id, f'Running processor: {processor_name} in docker-container failed' |
|
492
|
|
|
return res.id |
|
493
|
|
|
|
|
494
|
|
|
# TODO: Just a copy of the above start_native_processor() method. |
|
495
|
|
|
# Far from being great... But should be good as a starting point |
|
496
|
|
|
def start_native_processor_server( |
|
497
|
|
|
self, |
|
498
|
|
|
ssh_client, |
|
499
|
|
|
processor_name: str, |
|
500
|
|
|
agent_address: str, |
|
501
|
|
|
database_url: str |
|
502
|
|
|
) -> str: |
|
503
|
|
|
self.log.info(f"Starting native processor server: {processor_name} on {agent_address}") |
|
504
|
|
|
channel = ssh_client.invoke_shell() |
|
505
|
|
|
stdin, stdout = channel.makefile('wb'), channel.makefile('rb') |
|
506
|
|
|
cmd = f'{processor_name} --type server --address {agent_address} --database {database_url}' |
|
507
|
|
|
port = agent_address.split(':')[1] |
|
508
|
|
|
log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log' |
|
509
|
|
|
# TODO: This entire stdin/stdout thing is broken with servers! |
|
510
|
|
|
stdin.write(f"echo starting processor server with '{cmd}' >> '{log_path}'\n") |
|
511
|
|
|
stdin.write(f'{cmd} >> {log_path} 2>&1 &\n') |
|
512
|
|
|
stdin.write('echo xyz$!xyz \n exit \n') |
|
513
|
|
|
output = stdout.read().decode('utf-8') |
|
514
|
|
|
stdout.close() |
|
515
|
|
|
stdin.close() |
|
516
|
|
|
return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore |
|
517
|
|
|
|