1
|
|
|
from datetime import datetime |
2
|
|
|
from os import getpid |
3
|
|
|
from pathlib import Path |
4
|
|
|
from typing import Dict, List, Optional, Union |
5
|
|
|
from uvicorn import run as uvicorn_run |
6
|
|
|
|
7
|
|
|
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile |
8
|
|
|
from fastapi.exceptions import RequestValidationError |
9
|
|
|
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse |
10
|
|
|
|
11
|
|
|
from ocrd.task_sequence import ProcessorTask |
12
|
|
|
from ocrd_utils import initLogging, getLogger |
13
|
|
|
from .constants import AgentType, JobState, ServerApiTags |
14
|
|
|
from .database import ( |
15
|
|
|
initiate_database, |
16
|
|
|
db_get_processing_job, |
17
|
|
|
db_get_processing_jobs, |
18
|
|
|
db_update_processing_job, |
19
|
|
|
db_update_workspace, |
20
|
|
|
db_get_workflow_script, |
21
|
|
|
db_find_first_workflow_script_by_content |
22
|
|
|
) |
23
|
|
|
from .runtime_data import Deployer |
24
|
|
|
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path |
25
|
|
|
from .models import ( |
26
|
|
|
DBProcessorJob, |
27
|
|
|
DBWorkflowJob, |
28
|
|
|
DBWorkflowScript, |
29
|
|
|
PYJobInput, |
30
|
|
|
PYJobOutput, |
31
|
|
|
PYResultMessage, |
32
|
|
|
PYWorkflowJobOutput |
33
|
|
|
) |
34
|
|
|
from .rabbitmq_utils import ( |
35
|
|
|
check_if_queue_exists, |
36
|
|
|
connect_rabbitmq_publisher, |
37
|
|
|
create_message_queues, |
38
|
|
|
OcrdProcessingMessage |
39
|
|
|
) |
40
|
|
|
from .server_cache import CacheLockedPages, CacheProcessingRequests |
41
|
|
|
from .server_utils import ( |
42
|
|
|
create_processing_message, |
43
|
|
|
create_workspace_if_not_exists, |
44
|
|
|
forward_job_to_processor_server, |
45
|
|
|
_get_processor_job, |
46
|
|
|
_get_processor_job_log, |
47
|
|
|
get_page_ids_list, |
48
|
|
|
get_workflow_content, |
49
|
|
|
get_from_database_workspace, |
50
|
|
|
get_from_database_workflow_job, |
51
|
|
|
kill_mets_server_zombies, |
52
|
|
|
parse_workflow_tasks, |
53
|
|
|
raise_http_exception, |
54
|
|
|
request_processor_server_tool_json, |
55
|
|
|
validate_and_return_mets_path, |
56
|
|
|
validate_first_task_input_file_groups_existence, |
57
|
|
|
validate_job_input, |
58
|
|
|
validate_workflow |
59
|
|
|
) |
60
|
|
|
from .tcp_to_uds_mets_proxy import MetsServerProxy |
61
|
|
|
from .utils import ( |
62
|
|
|
load_ocrd_all_tool_json, |
63
|
|
|
expand_page_ids, |
64
|
|
|
generate_id, |
65
|
|
|
generate_workflow_content, |
66
|
|
|
generate_workflow_content_hash |
67
|
|
|
) |
68
|
|
|
|
69
|
|
|
|
70
|
|
|
class ProcessingServer(FastAPI): |
71
|
|
|
"""FastAPI app to make ocr-d processor calls |
72
|
|
|
|
73
|
|
|
The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing |
74
|
|
|
part. It can run ocrd-processors and provides endpoints to discover processors and watch the job |
75
|
|
|
status. |
76
|
|
|
The Processing-Server does not execute the processors itself but starts up a queue and a |
77
|
|
|
database to delegate the calls to processing workers. They are started by the Processing-Server |
78
|
|
|
and the communication goes through the queue. |
79
|
|
|
""" |
80
|
|
|
|
81
|
|
|
def __init__(self, config_path: str, host: str, port: int) -> None: |
82
|
|
|
self.title = "OCR-D Processing Server" |
83
|
|
|
super().__init__( |
84
|
|
|
title=self.title, |
85
|
|
|
on_startup=[self.on_startup], |
86
|
|
|
on_shutdown=[self.on_shutdown], |
87
|
|
|
description="OCR-D Processing Server" |
88
|
|
|
) |
89
|
|
|
initLogging() |
90
|
|
|
self.log = getLogger("ocrd_network.processing_server") |
91
|
|
|
log_file = get_processing_server_logging_file_path(pid=getpid()) |
92
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
93
|
|
|
|
94
|
|
|
self.log.info("Loading ocrd-all-tool.json") |
95
|
|
|
self.ocrd_all_tool_json = load_ocrd_all_tool_json() |
96
|
|
|
self.hostname = host |
97
|
|
|
self.port = port |
98
|
|
|
|
99
|
|
|
# The deployer is used for: |
100
|
|
|
# - deploying agents when the Processing Server is started |
101
|
|
|
# - retrieving runtime data of agents |
102
|
|
|
self.deployer = Deployer(config_path) |
103
|
|
|
# Used for forwarding Mets Server TCP requests to UDS requests |
104
|
|
|
self.mets_server_proxy = MetsServerProxy() |
105
|
|
|
self.use_tcp_mets = self.deployer.use_tcp_mets |
106
|
|
|
# If set, all Mets Server UDS requests are multiplexed over TCP |
107
|
|
|
# Used by processing workers and/or processor servers to report back the results |
108
|
|
|
if self.deployer.internal_callback_url: |
109
|
|
|
host = self.deployer.internal_callback_url |
110
|
|
|
self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback" |
111
|
|
|
self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets" |
112
|
|
|
else: |
113
|
|
|
self.internal_job_callback_url = f"http://{host}:{port}/result_callback" |
114
|
|
|
self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets" |
115
|
|
|
|
116
|
|
|
self.mongodb_url = None |
117
|
|
|
self.rabbitmq_url = None |
118
|
|
|
self.rmq_data = { |
119
|
|
|
"host": self.deployer.data_queue.host, |
120
|
|
|
"port": self.deployer.data_queue.port, |
121
|
|
|
"vhost": "/", |
122
|
|
|
"username": self.deployer.data_queue.cred_username, |
123
|
|
|
"password": self.deployer.data_queue.cred_password |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
# Gets assigned when `connect_rabbitmq_publisher()` is called on the working object |
127
|
|
|
self.rmq_publisher = None |
128
|
|
|
|
129
|
|
|
# Used for keeping track of cached processing requests |
130
|
|
|
self.cache_processing_requests = CacheProcessingRequests() |
131
|
|
|
|
132
|
|
|
# Used for keeping track of locked/unlocked pages of a workspace |
133
|
|
|
self.cache_locked_pages = CacheLockedPages() |
134
|
|
|
|
135
|
|
|
self.add_api_routes_others() |
136
|
|
|
self.add_api_routes_processing() |
137
|
|
|
self.add_api_routes_workflow() |
138
|
|
|
|
139
|
|
|
@self.exception_handler(RequestValidationError) |
140
|
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError): |
141
|
|
|
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') |
142
|
|
|
self.log.error(f'{request}: {exc_str}') |
143
|
|
|
content = {'status_code': 10422, 'message': exc_str, 'data': None} |
144
|
|
|
return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) |
145
|
|
|
|
146
|
|
|
def start(self) -> None: |
147
|
|
|
""" deploy agents (db, queue, workers) and start the processing server with uvicorn |
148
|
|
|
""" |
149
|
|
|
try: |
150
|
|
|
self.rabbitmq_url = self.deployer.deploy_rabbitmq() |
151
|
|
|
self.mongodb_url = self.deployer.deploy_mongodb() |
152
|
|
|
|
153
|
|
|
# The RMQPublisher is initialized and a connection to the RabbitMQ is performed |
154
|
|
|
self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) |
155
|
|
|
|
156
|
|
|
queue_names = self.deployer.find_matching_network_agents( |
157
|
|
|
worker_only=True, str_names_only=True, unique_only=True |
158
|
|
|
) |
159
|
|
|
self.log.info(f"Creating message queues on RabbitMQ instance url: {self.rabbitmq_url}") |
160
|
|
|
create_message_queues(logger=self.log, rmq_publisher=self.rmq_publisher, queue_names=queue_names) |
161
|
|
|
|
162
|
|
|
self.deployer.deploy_network_agents(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) |
163
|
|
|
except Exception as error: |
164
|
|
|
self.log.exception(f"Failed to start the Processing Server, error: {error}") |
165
|
|
|
self.log.warning("Trying to stop previously deployed services and network agents.") |
166
|
|
|
self.deployer.stop_all() |
167
|
|
|
raise |
168
|
|
|
uvicorn_run(self, host=self.hostname, port=int(self.port)) |
169
|
|
|
|
170
|
|
|
async def on_startup(self): |
171
|
|
|
self.log.info(f"Initializing the Database on: {self.mongodb_url}") |
172
|
|
|
await initiate_database(db_url=self.mongodb_url) |
173
|
|
|
|
174
|
|
|
async def on_shutdown(self) -> None: |
175
|
|
|
""" |
176
|
|
|
- hosts and pids should be stored somewhere |
177
|
|
|
- ensure queue is empty or processor is not currently running |
178
|
|
|
- connect to hosts and kill pids |
179
|
|
|
""" |
180
|
|
|
await self.stop_deployed_agents() |
181
|
|
|
|
182
|
|
|
def add_api_routes_others(self): |
183
|
|
|
others_router = APIRouter() |
184
|
|
|
others_router.add_api_route( |
185
|
|
|
path="/", |
186
|
|
|
endpoint=self.home_page, |
187
|
|
|
methods=["GET"], |
188
|
|
|
status_code=status.HTTP_200_OK, |
189
|
|
|
summary="Get information about the processing server" |
190
|
|
|
) |
191
|
|
|
others_router.add_api_route( |
192
|
|
|
path="/stop", |
193
|
|
|
endpoint=self.stop_deployed_agents, |
194
|
|
|
methods=["POST"], |
195
|
|
|
tags=[ServerApiTags.TOOLS], |
196
|
|
|
summary="Stop database, queue and processing-workers" |
197
|
|
|
) |
198
|
|
|
others_router.add_api_route( |
199
|
|
|
path="/tcp_mets", |
200
|
|
|
methods=["POST"], |
201
|
|
|
endpoint=self.forward_tcp_request_to_uds_mets_server, |
202
|
|
|
tags=[ServerApiTags.WORKSPACE], |
203
|
|
|
summary="Forward a TCP request to UDS mets server" |
204
|
|
|
) |
205
|
|
|
others_router.add_api_route( |
206
|
|
|
path="/kill_mets_server_zombies", |
207
|
|
|
endpoint=self.kill_mets_server_zombies, |
208
|
|
|
methods=["DELETE"], |
209
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
210
|
|
|
status_code=status.HTTP_200_OK, |
211
|
|
|
summary="!! Workaround Do Not Use Unless You Have A Reason " |
212
|
|
|
"!! Kill all METS servers on this machine that have been created more than 60 minutes ago." |
213
|
|
|
) |
214
|
|
|
self.include_router(others_router) |
215
|
|
|
|
216
|
|
|
def add_api_routes_processing(self): |
217
|
|
|
processing_router = APIRouter() |
218
|
|
|
processing_router.add_api_route( |
219
|
|
|
path="/processor", |
220
|
|
|
endpoint=self.list_processors, |
221
|
|
|
methods=["GET"], |
222
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
223
|
|
|
status_code=status.HTTP_200_OK, |
224
|
|
|
summary="Get a list of all available processors" |
225
|
|
|
) |
226
|
|
|
processing_router.add_api_route( |
227
|
|
|
path="/processor/info/{processor_name}", |
228
|
|
|
endpoint=self.get_network_agent_ocrd_tool, |
229
|
|
|
methods=["GET"], |
230
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
231
|
|
|
status_code=status.HTTP_200_OK, |
232
|
|
|
summary="Get information about this processor" |
233
|
|
|
) |
234
|
|
|
processing_router.add_api_route( |
235
|
|
|
path="/processor/run/{processor_name}", |
236
|
|
|
endpoint=self.validate_and_forward_job_to_network_agent, |
237
|
|
|
methods=["POST"], |
238
|
|
|
tags=[ServerApiTags.PROCESSING], |
239
|
|
|
status_code=status.HTTP_200_OK, |
240
|
|
|
summary="Submit a job to this processor", |
241
|
|
|
response_model=PYJobOutput, |
242
|
|
|
response_model_exclude_unset=True, |
243
|
|
|
response_model_exclude_none=True |
244
|
|
|
) |
245
|
|
|
processing_router.add_api_route( |
246
|
|
|
path="/processor/job/{job_id}", |
247
|
|
|
endpoint=self.get_processor_job, |
248
|
|
|
methods=["GET"], |
249
|
|
|
tags=[ServerApiTags.PROCESSING], |
250
|
|
|
status_code=status.HTTP_200_OK, |
251
|
|
|
summary="Get information about a job based on its ID", |
252
|
|
|
response_model=PYJobOutput, |
253
|
|
|
response_model_exclude_unset=True, |
254
|
|
|
response_model_exclude_none=True |
255
|
|
|
) |
256
|
|
|
processing_router.add_api_route( |
257
|
|
|
path="/processor/log/{job_id}", |
258
|
|
|
endpoint=self.get_processor_job_log, |
259
|
|
|
methods=["GET"], |
260
|
|
|
tags=[ServerApiTags.PROCESSING], |
261
|
|
|
status_code=status.HTTP_200_OK, |
262
|
|
|
summary="Get the log file of a job id" |
263
|
|
|
) |
264
|
|
|
processing_router.add_api_route( |
265
|
|
|
path="/result_callback", |
266
|
|
|
endpoint=self.remove_job_from_request_cache, |
267
|
|
|
methods=["POST"], |
268
|
|
|
tags=[ServerApiTags.PROCESSING], |
269
|
|
|
status_code=status.HTTP_200_OK, |
270
|
|
|
summary="Callback used by a worker or processor server for reporting result of a processing request" |
271
|
|
|
) |
272
|
|
|
self.include_router(processing_router) |
273
|
|
|
|
274
|
|
|
def add_api_routes_workflow(self): |
275
|
|
|
workflow_router = APIRouter() |
276
|
|
|
workflow_router.add_api_route( |
277
|
|
|
path="/workflow", |
278
|
|
|
endpoint=self.upload_workflow, |
279
|
|
|
methods=["POST"], |
280
|
|
|
tags=[ServerApiTags.WORKFLOW], |
281
|
|
|
status_code=status.HTTP_201_CREATED, |
282
|
|
|
summary="Upload/Register a new workflow script" |
283
|
|
|
) |
284
|
|
|
workflow_router.add_api_route( |
285
|
|
|
path="/workflow/{workflow_id}", |
286
|
|
|
endpoint=self.download_workflow, |
287
|
|
|
methods=["GET"], |
288
|
|
|
tags=[ServerApiTags.WORKFLOW], |
289
|
|
|
status_code=status.HTTP_200_OK, |
290
|
|
|
summary="Download a workflow script" |
291
|
|
|
) |
292
|
|
|
workflow_router.add_api_route( |
293
|
|
|
path="/workflow/{workflow_id}", |
294
|
|
|
endpoint=self.replace_workflow, |
295
|
|
|
methods=["PUT"], |
296
|
|
|
tags=[ServerApiTags.WORKFLOW], |
297
|
|
|
status_code=status.HTTP_200_OK, |
298
|
|
|
summary="Update/Replace a workflow script" |
299
|
|
|
) |
300
|
|
|
workflow_router.add_api_route( |
301
|
|
|
path="/workflow/run", |
302
|
|
|
endpoint=self.run_workflow, |
303
|
|
|
methods=["POST"], |
304
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
305
|
|
|
status_code=status.HTTP_200_OK, |
306
|
|
|
summary="Run a workflow", |
307
|
|
|
response_model=PYWorkflowJobOutput, |
308
|
|
|
response_model_exclude_defaults=True, |
309
|
|
|
response_model_exclude_unset=True, |
310
|
|
|
response_model_exclude_none=True |
311
|
|
|
) |
312
|
|
|
workflow_router.add_api_route( |
313
|
|
|
path="/workflow/job-simple/{workflow_job_id}", |
314
|
|
|
endpoint=self.get_workflow_info_simple, |
315
|
|
|
methods=["GET"], |
316
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
317
|
|
|
status_code=status.HTTP_200_OK, |
318
|
|
|
summary="Get simplified overall job status" |
319
|
|
|
) |
320
|
|
|
workflow_router.add_api_route( |
321
|
|
|
path="/workflow/job/{workflow_job_id}", |
322
|
|
|
endpoint=self.get_workflow_info, |
323
|
|
|
methods=["GET"], |
324
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
325
|
|
|
status_code=status.HTTP_200_OK, |
326
|
|
|
summary="Get information about a workflow run" |
327
|
|
|
) |
328
|
|
|
self.include_router(workflow_router) |
329
|
|
|
|
330
|
|
|
async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict: |
331
|
|
|
"""Forward mets-server-request |
332
|
|
|
|
333
|
|
|
A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends |
334
|
|
|
a request to this endpoint. This request contains all information necessary to make a call |
335
|
|
|
to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call |
336
|
|
|
to the local (local for the processing-server) reachable the uds-mets-server. |
337
|
|
|
""" |
338
|
|
|
request_body = await request.json() |
339
|
|
|
ws_dir_path = request_body["workspace_path"] |
340
|
|
|
self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
341
|
|
|
return self.mets_server_proxy.forward_tcp_request(request_body=request_body) |
342
|
|
|
|
343
|
|
|
async def home_page(self): |
344
|
|
|
message = f"The home page of the {self.title}" |
345
|
|
|
json_message = { |
346
|
|
|
"message": message, |
347
|
|
|
"time": datetime.now().strftime("%Y-%m-%d %H:%M") |
348
|
|
|
} |
349
|
|
|
return json_message |
350
|
|
|
|
351
|
|
|
async def stop_deployed_agents(self) -> None: |
352
|
|
|
self.deployer.stop_all() |
353
|
|
|
|
354
|
|
|
def query_ocrd_tool_json_from_server(self, processor_name: str) -> Dict: |
355
|
|
|
processor_server_base_url = self.deployer.resolve_processor_server_url(processor_name) |
356
|
|
|
if processor_server_base_url == '': |
357
|
|
|
message = f"Processor Server URL of '{processor_name}' not found" |
358
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message=message) |
359
|
|
|
return request_processor_server_tool_json(self.log, processor_server_base_url=processor_server_base_url) |
360
|
|
|
|
361
|
|
|
async def get_network_agent_ocrd_tool( |
362
|
|
|
self, processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER |
363
|
|
|
) -> Dict: |
364
|
|
|
ocrd_tool = {} |
365
|
|
|
error_message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." |
366
|
|
|
if agent_type != AgentType.PROCESSING_WORKER and agent_type != AgentType.PROCESSOR_SERVER: |
367
|
|
|
message = f"Unknown agent type: {agent_type}, {type(agent_type)}" |
368
|
|
|
raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) |
369
|
|
|
if agent_type == AgentType.PROCESSING_WORKER: |
370
|
|
|
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) |
371
|
|
|
if agent_type == AgentType.PROCESSOR_SERVER: |
372
|
|
|
ocrd_tool = self.query_ocrd_tool_json_from_server(processor_name) |
373
|
|
|
if not ocrd_tool: |
374
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message) |
375
|
|
|
return ocrd_tool |
376
|
|
|
|
377
|
|
|
def network_agent_exists_server(self, processor_name: str) -> bool: |
378
|
|
|
processor_server_url = self.deployer.resolve_processor_server_url(processor_name) |
379
|
|
|
return bool(processor_server_url) |
380
|
|
|
|
381
|
|
|
def network_agent_exists_worker(self, processor_name: str) -> bool: |
382
|
|
|
# TODO: Reconsider and refactor this. |
383
|
|
|
# Added ocrd-dummy by default if not available for the integration tests. |
384
|
|
|
# A proper Processing Worker / Processor Server registration endpoint |
385
|
|
|
# is needed on the Processing Server side |
386
|
|
|
if processor_name == 'ocrd-dummy': |
387
|
|
|
return True |
388
|
|
|
return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name)) |
389
|
|
|
|
390
|
|
|
def validate_agent_type_and_existence(self, processor_name: str, agent_type: AgentType) -> None: |
391
|
|
|
agent_exists = False |
392
|
|
|
if agent_type == AgentType.PROCESSOR_SERVER: |
393
|
|
|
agent_exists = self.network_agent_exists_server(processor_name=processor_name) |
394
|
|
|
elif agent_type == AgentType.PROCESSING_WORKER: |
395
|
|
|
agent_exists = self.network_agent_exists_worker(processor_name=processor_name) |
396
|
|
|
else: |
397
|
|
|
message = f"Unknown agent type: {agent_type}, {type(agent_type)}" |
398
|
|
|
raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) |
399
|
|
|
if not agent_exists: |
400
|
|
|
message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." |
401
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
402
|
|
|
|
403
|
|
|
async def validate_and_forward_job_to_network_agent(self, processor_name: str, data: PYJobInput) -> PYJobOutput: |
404
|
|
|
# Append the processor name to the request itself |
405
|
|
|
data.processor_name = processor_name |
406
|
|
|
self.validate_agent_type_and_existence(processor_name=data.processor_name, agent_type=data.agent_type) |
407
|
|
|
if data.job_id: |
408
|
|
|
message = f"Processing request job id field is set but must not be: {data.job_id}" |
409
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
410
|
|
|
# Generate processing job id |
411
|
|
|
data.job_id = generate_id() |
412
|
|
|
ocrd_tool = await self.get_network_agent_ocrd_tool( |
413
|
|
|
processor_name=data.processor_name, |
414
|
|
|
agent_type=data.agent_type |
415
|
|
|
) |
416
|
|
|
validate_job_input(self.log, data.processor_name, ocrd_tool, data) |
417
|
|
|
|
418
|
|
|
if data.workspace_id: |
419
|
|
|
# just a check whether the workspace exists in the database or not |
420
|
|
|
await get_from_database_workspace(self.log, data.workspace_id) |
421
|
|
|
else: # data.path_to_mets provided instead |
422
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets) |
423
|
|
|
|
424
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
425
|
|
|
# initialize the request counter for the workspace_key |
426
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0) |
427
|
|
|
|
428
|
|
|
# This check is done to return early in case a workspace_id is provided |
429
|
|
|
# but the abs mets path cannot be queried from the DB |
430
|
|
|
request_mets_path = await validate_and_return_mets_path(self.log, data) |
431
|
|
|
|
432
|
|
|
page_ids = expand_page_ids(data.page_id) |
433
|
|
|
|
434
|
|
|
# A flag whether the current request must be cached |
435
|
|
|
# This is set to true if for any output file group there |
436
|
|
|
# is a page_id value that has been previously locked |
437
|
|
|
cache_current_request = False |
438
|
|
|
|
439
|
|
|
# Check if there are any dependencies of the current request |
440
|
|
|
if data.depends_on: |
441
|
|
|
cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on) |
442
|
|
|
|
443
|
|
|
# No need for further check of locked pages dependency |
444
|
|
|
# if the request should be already cached |
445
|
|
|
if not cache_current_request: |
446
|
|
|
# Check if there are any locked pages for the current request |
447
|
|
|
cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps( |
448
|
|
|
workspace_key=workspace_key, |
449
|
|
|
output_file_grps=data.output_file_grps, |
450
|
|
|
page_ids=page_ids |
451
|
|
|
) |
452
|
|
|
|
453
|
|
|
if cache_current_request: |
454
|
|
|
# Cache the received request |
455
|
|
|
self.cache_processing_requests.cache_request(workspace_key, data) |
456
|
|
|
|
457
|
|
|
# Create a cached job DB entry |
458
|
|
|
db_cached_job = DBProcessorJob( |
459
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
460
|
|
|
internal_callback_url=self.internal_job_callback_url, |
461
|
|
|
state=JobState.cached |
462
|
|
|
) |
463
|
|
|
await db_cached_job.insert() |
464
|
|
|
return db_cached_job.to_job_output() |
465
|
|
|
|
466
|
|
|
# Lock the pages in the request |
467
|
|
|
self.cache_locked_pages.lock_pages( |
468
|
|
|
workspace_key=workspace_key, |
469
|
|
|
output_file_grps=data.output_file_grps, |
470
|
|
|
page_ids=page_ids |
471
|
|
|
) |
472
|
|
|
|
473
|
|
|
# Start a UDS Mets Server with the current workspace |
474
|
|
|
ws_dir_path = str(Path(request_mets_path).parent) |
475
|
|
|
mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
476
|
|
|
if self.use_tcp_mets: |
477
|
|
|
# let workers talk to mets server via tcp instead of using unix-socket |
478
|
|
|
mets_server_url = self.multiplexing_endpoint |
479
|
|
|
|
480
|
|
|
# Assign the mets server url in the database (workers read mets_server_url from db) |
481
|
|
|
await db_update_workspace( |
482
|
|
|
workspace_id=data.workspace_id, |
483
|
|
|
workspace_mets_path=data.path_to_mets, |
484
|
|
|
mets_server_url=mets_server_url |
485
|
|
|
) |
486
|
|
|
|
487
|
|
|
# Create a queued job DB entry |
488
|
|
|
db_queued_job = DBProcessorJob( |
489
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
490
|
|
|
internal_callback_url=self.internal_job_callback_url, |
491
|
|
|
state=JobState.queued |
492
|
|
|
) |
493
|
|
|
await db_queued_job.insert() |
494
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
495
|
|
|
job_output = await self.push_job_to_network_agent(data=data, db_job=db_queued_job) |
496
|
|
|
return job_output |
497
|
|
|
|
498
|
|
|
async def push_job_to_network_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: |
499
|
|
|
if data.agent_type != AgentType.PROCESSING_WORKER and data.agent_type != AgentType.PROCESSOR_SERVER: |
500
|
|
|
message = f"Unknown agent type: {data.agent_type}, {type(data.agent_type)}" |
501
|
|
|
raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) |
502
|
|
|
job_output = None |
503
|
|
|
self.log.debug(f"Pushing to {data.agent_type}: {data.processor_name}, {data.page_id}, {data.job_id}") |
504
|
|
|
if data.agent_type == AgentType.PROCESSING_WORKER: |
505
|
|
|
job_output = await self.push_job_to_processing_queue(db_job=db_job) |
506
|
|
|
if data.agent_type == AgentType.PROCESSOR_SERVER: |
507
|
|
|
job_output = await self.push_job_to_processor_server(job_input=data) |
508
|
|
|
if not job_output: |
509
|
|
|
message = f"Failed to create job output for job input: {data}" |
510
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
511
|
|
|
return job_output |
512
|
|
|
|
513
|
|
|
async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput: |
514
|
|
|
if not self.rmq_publisher: |
515
|
|
|
message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected." |
516
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
517
|
|
|
processing_message = create_processing_message(self.log, db_job) |
518
|
|
|
try: |
519
|
|
|
encoded_message = OcrdProcessingMessage.encode_yml(processing_message) |
520
|
|
|
self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message) |
521
|
|
|
except Exception as error: |
522
|
|
|
message = ( |
523
|
|
|
f"Processing server has failed to push processing message to queue: {db_job.processor_name}, " |
524
|
|
|
f"Processing message: {processing_message.__dict__}" |
525
|
|
|
) |
526
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
527
|
|
|
return db_job.to_job_output() |
528
|
|
|
|
529
|
|
|
async def push_job_to_processor_server(self, job_input: PYJobInput) -> PYJobOutput: |
530
|
|
|
processor_server_base_url = self.deployer.resolve_processor_server_url(job_input.processor_name) |
531
|
|
|
return await forward_job_to_processor_server( |
532
|
|
|
self.log, job_input=job_input, processor_server_base_url=processor_server_base_url |
533
|
|
|
) |
534
|
|
|
|
535
|
|
|
async def get_processor_job(self, job_id: str) -> PYJobOutput: |
536
|
|
|
return await _get_processor_job(self.log, job_id) |
537
|
|
|
|
538
|
|
|
async def get_processor_job_log(self, job_id: str) -> FileResponse: |
539
|
|
|
return await _get_processor_job_log(self.log, job_id) |
540
|
|
|
|
541
|
|
|
async def _lock_pages_of_workspace( |
542
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
543
|
|
|
) -> None: |
544
|
|
|
# Lock the output file group pages for the current request |
545
|
|
|
self.cache_locked_pages.lock_pages( |
546
|
|
|
workspace_key=workspace_key, |
547
|
|
|
output_file_grps=output_file_grps, |
548
|
|
|
page_ids=page_ids |
549
|
|
|
) |
550
|
|
|
|
551
|
|
|
async def _unlock_pages_of_workspace( |
552
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
553
|
|
|
) -> None: |
554
|
|
|
self.cache_locked_pages.unlock_pages( |
555
|
|
|
workspace_key=workspace_key, |
556
|
|
|
output_file_grps=output_file_grps, |
557
|
|
|
page_ids=page_ids |
558
|
|
|
) |
559
|
|
|
|
560
|
|
|
async def push_cached_jobs_to_agents(self, processing_jobs: List[PYJobInput]) -> None: |
561
|
|
|
if not len(processing_jobs): |
562
|
|
|
self.log.debug("No processing jobs were consumed from the requests cache") |
563
|
|
|
return |
564
|
|
|
for data in processing_jobs: |
565
|
|
|
self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}") |
566
|
|
|
db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued) |
567
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
568
|
|
|
|
569
|
|
|
# Lock the output file group pages for the current request |
570
|
|
|
await self._lock_pages_of_workspace( |
571
|
|
|
workspace_key=workspace_key, |
572
|
|
|
output_file_grps=data.output_file_grps, |
573
|
|
|
page_ids=expand_page_ids(data.page_id) |
574
|
|
|
) |
575
|
|
|
|
576
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
577
|
|
|
job_output = await self.push_job_to_network_agent(data=data, db_job=db_consumed_job) |
578
|
|
|
if not job_output: |
579
|
|
|
self.log.exception(f"Failed to create job output for job input data: {data}") |
580
|
|
|
|
581
|
|
|
async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None: |
582
|
|
|
await self.cache_processing_requests.cancel_dependent_jobs( |
583
|
|
|
workspace_key=workspace_key, |
584
|
|
|
processing_job_id=job_id |
585
|
|
|
) |
586
|
|
|
|
587
|
|
|
async def _consume_cached_jobs_of_workspace( |
588
|
|
|
self, workspace_key: str, mets_server_url: str, path_to_mets: str |
589
|
|
|
) -> List[PYJobInput]: |
590
|
|
|
# decrease the internal cache counter by 1 |
591
|
|
|
request_counter = self.cache_processing_requests.update_request_counter( |
592
|
|
|
workspace_key=workspace_key, by_value=-1 |
593
|
|
|
) |
594
|
|
|
self.log.debug(f"Internal processing job cache counter value: {request_counter}") |
595
|
|
|
if (workspace_key not in self.cache_processing_requests.processing_requests or |
596
|
|
|
not len(self.cache_processing_requests.processing_requests[workspace_key])): |
597
|
|
|
if request_counter <= 0: |
598
|
|
|
# Shut down the Mets Server for the workspace_key since no |
599
|
|
|
# more internal callbacks are expected for that workspace |
600
|
|
|
self.log.debug(f"Stopping the mets server: {mets_server_url}") |
601
|
|
|
self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) |
602
|
|
|
|
603
|
|
|
try: |
604
|
|
|
# The queue is empty - delete it |
605
|
|
|
del self.cache_processing_requests.processing_requests[workspace_key] |
606
|
|
|
except KeyError: |
607
|
|
|
self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") |
608
|
|
|
|
609
|
|
|
# For debugging purposes it is good to see if any locked pages are left |
610
|
|
|
self.log.debug(f"Contents of the locked pages cache for: {workspace_key}") |
611
|
|
|
locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key) |
612
|
|
|
for output_file_grp in locked_pages: |
613
|
|
|
self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}") |
614
|
|
|
else: |
615
|
|
|
self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") |
616
|
|
|
return [] |
617
|
|
|
# Check whether the internal queue for the workspace key still exists |
618
|
|
|
if workspace_key not in self.cache_processing_requests.processing_requests: |
619
|
|
|
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") |
620
|
|
|
return [] |
621
|
|
|
consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) |
622
|
|
|
return consumed_requests |
623
|
|
|
|
624
|
|
|
async def remove_job_from_request_cache(self, result_message: PYResultMessage): |
625
|
|
|
result_job_id = result_message.job_id |
626
|
|
|
result_job_state = result_message.state |
627
|
|
|
path_to_mets = result_message.path_to_mets |
628
|
|
|
workspace_id = result_message.workspace_id |
629
|
|
|
self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}") |
630
|
|
|
|
631
|
|
|
db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets) |
632
|
|
|
mets_server_url = db_workspace.mets_server_url |
633
|
|
|
workspace_key = path_to_mets if path_to_mets else workspace_id |
634
|
|
|
|
635
|
|
|
if result_job_state == JobState.failed: |
636
|
|
|
await self._cancel_cached_dependent_jobs(workspace_key, result_job_id) |
637
|
|
|
|
638
|
|
|
if result_job_state != JobState.success: |
639
|
|
|
# TODO: Handle other potential error cases |
640
|
|
|
pass |
641
|
|
|
|
642
|
|
|
try: |
643
|
|
|
db_result_job = await db_get_processing_job(result_job_id) |
644
|
|
|
# Unlock the output file group pages for the result processing request |
645
|
|
|
await self._unlock_pages_of_workspace( |
646
|
|
|
workspace_key=workspace_key, |
647
|
|
|
output_file_grps=db_result_job.output_file_grps, |
648
|
|
|
page_ids=expand_page_ids(db_result_job.page_id) |
649
|
|
|
) |
650
|
|
|
except ValueError as error: |
651
|
|
|
message = f"Processing result job with id '{result_job_id}' not found in the DB." |
652
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
653
|
|
|
|
654
|
|
|
consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( |
655
|
|
|
workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets |
656
|
|
|
) |
657
|
|
|
await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) |
658
|
|
|
|
659
|
|
|
async def list_processors(self) -> List[str]: |
660
|
|
|
# There is no caching on the Processing Server side |
661
|
|
|
processor_names_list = self.deployer.find_matching_network_agents( |
662
|
|
|
docker_only=False, native_only=False, worker_only=False, server_only=False, |
663
|
|
|
str_names_only=True, unique_only=True, sort=True |
664
|
|
|
) |
665
|
|
|
return processor_names_list |
666
|
|
|
|
667
|
|
|
async def task_sequence_to_processing_jobs( |
668
|
|
|
self, |
669
|
|
|
tasks: List[ProcessorTask], |
670
|
|
|
mets_path: str, |
671
|
|
|
page_id: str, |
672
|
|
|
agent_type: AgentType = AgentType.PROCESSING_WORKER |
673
|
|
|
) -> List[PYJobOutput]: |
674
|
|
|
temp_file_group_cache = {} |
675
|
|
|
responses = [] |
676
|
|
|
for task in tasks: |
677
|
|
|
# Find dependent jobs of the current task |
678
|
|
|
dependent_jobs = [] |
679
|
|
|
for input_file_grp in task.input_file_grps: |
680
|
|
|
if input_file_grp in temp_file_group_cache: |
681
|
|
|
dependent_jobs.append(temp_file_group_cache[input_file_grp]) |
682
|
|
|
# NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level |
683
|
|
|
# Thus, setting these two flags in the ocrd process workflow file has no effect |
684
|
|
|
job_input_data = PYJobInput( |
685
|
|
|
processor_name=task.executable, |
686
|
|
|
path_to_mets=mets_path, |
687
|
|
|
input_file_grps=task.input_file_grps, |
688
|
|
|
output_file_grps=task.output_file_grps, |
689
|
|
|
page_id=page_id, |
690
|
|
|
parameters=task.parameters, |
691
|
|
|
agent_type=agent_type, |
692
|
|
|
depends_on=dependent_jobs, |
693
|
|
|
) |
694
|
|
|
response = await self.validate_and_forward_job_to_network_agent( |
695
|
|
|
processor_name=job_input_data.processor_name, |
696
|
|
|
data=job_input_data |
697
|
|
|
) |
698
|
|
|
for file_group in task.output_file_grps: |
699
|
|
|
temp_file_group_cache[file_group] = response.job_id |
700
|
|
|
responses.append(response) |
701
|
|
|
return responses |
702
|
|
|
|
703
|
|
|
def validate_tasks_agents_existence(self, tasks: List[ProcessorTask], agent_type: AgentType) -> None: |
704
|
|
|
missing_agents = [] |
705
|
|
|
for task in tasks: |
706
|
|
|
try: |
707
|
|
|
self.validate_agent_type_and_existence(processor_name=task.executable, agent_type=agent_type) |
708
|
|
|
except HTTPException: |
709
|
|
|
# catching the error is not relevant here |
710
|
|
|
missing_agents.append({task.executable, agent_type}) |
711
|
|
|
if missing_agents: |
712
|
|
|
message = ( |
713
|
|
|
"Workflow validation has failed. The desired network agents not found. " |
714
|
|
|
f"Missing processing agents: {missing_agents}" |
715
|
|
|
) |
716
|
|
|
raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message) |
717
|
|
|
|
718
|
|
|
async def run_workflow( |
719
|
|
|
self, |
720
|
|
|
mets_path: str, |
721
|
|
|
workflow: Union[UploadFile, str, None] = File(None), |
722
|
|
|
workflow_id: str = None, |
723
|
|
|
agent_type: AgentType = AgentType.PROCESSING_WORKER, |
724
|
|
|
page_id: str = None, |
725
|
|
|
page_wise: bool = False, |
726
|
|
|
workflow_callback_url: str = None |
727
|
|
|
) -> PYWorkflowJobOutput: |
728
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=mets_path) |
729
|
|
|
workflow_content = await get_workflow_content(self.log, workflow_id, workflow) |
730
|
|
|
processing_tasks = parse_workflow_tasks(self.log, workflow_content) |
731
|
|
|
|
732
|
|
|
# Validate the input file groups of the first task in the workflow |
733
|
|
|
validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps) |
734
|
|
|
|
735
|
|
|
# Validate existence of agents (processing workers/processor servers) |
736
|
|
|
# for the ocr-d processors referenced inside tasks |
737
|
|
|
self.validate_tasks_agents_existence(processing_tasks, agent_type) |
738
|
|
|
|
739
|
|
|
# for page_wise mode, we need to expand the list of pages |
740
|
|
|
# for the database, it's better to keep a short string |
741
|
|
|
page_id = page_id or '' |
742
|
|
|
page_ids = get_page_ids_list(self.log, mets_path, page_id) |
743
|
|
|
|
744
|
|
|
if not page_wise: |
745
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
746
|
|
|
tasks=processing_tasks, |
747
|
|
|
mets_path=mets_path, |
748
|
|
|
page_id=page_id, |
749
|
|
|
agent_type=agent_type |
750
|
|
|
) |
751
|
|
|
processing_job_ids = [response.job_id for response in responses] |
752
|
|
|
db_workflow_job = DBWorkflowJob( |
753
|
|
|
job_id=generate_id(), |
754
|
|
|
page_id=page_id, |
755
|
|
|
page_wise=page_wise, |
756
|
|
|
processing_job_ids={page_id: processing_job_ids}, |
757
|
|
|
path_to_mets=mets_path, |
758
|
|
|
workflow_callback_url=workflow_callback_url |
759
|
|
|
) |
760
|
|
|
await db_workflow_job.insert() |
761
|
|
|
return db_workflow_job.to_job_output() |
762
|
|
|
|
763
|
|
|
all_pages_job_ids = {} |
764
|
|
|
for current_page in page_ids: |
765
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
766
|
|
|
tasks=processing_tasks, |
767
|
|
|
mets_path=mets_path, |
768
|
|
|
page_id=current_page, |
769
|
|
|
agent_type=agent_type |
770
|
|
|
) |
771
|
|
|
processing_job_ids = [response.job_id for response in responses] |
772
|
|
|
all_pages_job_ids[current_page] = processing_job_ids |
773
|
|
|
db_workflow_job = DBWorkflowJob( |
774
|
|
|
job_id=generate_id(), |
775
|
|
|
page_id=page_id, |
776
|
|
|
page_wise=page_wise, |
777
|
|
|
processing_job_ids=all_pages_job_ids, |
778
|
|
|
path_to_mets=mets_path, |
779
|
|
|
workflow_callback_url=workflow_callback_url |
780
|
|
|
) |
781
|
|
|
await db_workflow_job.insert() |
782
|
|
|
return db_workflow_job.to_job_output() |
783
|
|
|
|
784
|
|
|
@staticmethod |
785
|
|
|
def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict: |
786
|
|
|
response = {} |
787
|
|
|
failed_tasks = {} |
788
|
|
|
failed_tasks_key = "failed-processor-tasks" |
789
|
|
|
for p_job in processing_jobs: |
790
|
|
|
response.setdefault(p_job.processor_name, {}) |
791
|
|
|
response[p_job.processor_name].setdefault(p_job.state.value, 0) |
792
|
|
|
response[p_job.processor_name][p_job.state.value] += 1 |
793
|
|
|
if p_job.state == JobState.failed: |
794
|
|
|
if failed_tasks_key not in response: |
795
|
|
|
response[failed_tasks_key] = failed_tasks |
796
|
|
|
failed_tasks.setdefault(p_job.processor_name, []) |
797
|
|
|
failed_tasks[p_job.processor_name].append( |
798
|
|
|
{"job_id": p_job.job_id, "page_id": p_job.page_id} |
799
|
|
|
) |
800
|
|
|
return response |
801
|
|
|
|
802
|
|
|
@staticmethod |
803
|
|
|
def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState: |
804
|
|
|
workflow_job_state = JobState.unset |
805
|
|
|
success_jobs = 0 |
806
|
|
|
for p_job in processing_jobs: |
807
|
|
|
if p_job.state == JobState.cached or p_job.state == JobState.queued: |
808
|
|
|
continue |
809
|
|
|
if p_job.state == JobState.failed or p_job.state == JobState.cancelled: |
810
|
|
|
workflow_job_state = JobState.failed |
811
|
|
|
break |
812
|
|
|
if p_job.state == JobState.running: |
813
|
|
|
workflow_job_state = JobState.running |
814
|
|
|
if p_job.state == JobState.success: |
815
|
|
|
success_jobs += 1 |
816
|
|
|
if len(processing_jobs) == success_jobs: |
817
|
|
|
workflow_job_state = JobState.success |
818
|
|
|
return workflow_job_state |
819
|
|
|
|
820
|
|
|
async def get_workflow_info(self, workflow_job_id) -> Dict: |
821
|
|
|
""" Return list of a workflow's processor jobs |
822
|
|
|
""" |
823
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
824
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
825
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
826
|
|
|
response = self._produce_workflow_status_response(processing_jobs=jobs) |
827
|
|
|
return response |
828
|
|
|
|
829
|
|
|
async def kill_mets_server_zombies(self, minutes_ago: Optional[int] = None, dry_run: Optional[bool] = None) -> List[int]: |
830
|
|
|
pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) |
831
|
|
|
return pids_killed |
832
|
|
|
|
833
|
|
|
async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: |
834
|
|
|
""" |
835
|
|
|
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. |
836
|
|
|
- If a single processing job fails, the entire workflow job status is set to FAILED. |
837
|
|
|
- If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED, |
838
|
|
|
the entire workflow job status is set to RUNNING. |
839
|
|
|
- If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS |
840
|
|
|
""" |
841
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
842
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
843
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
844
|
|
|
workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs) |
845
|
|
|
return {"state": workflow_job_state} |
846
|
|
|
|
847
|
|
|
async def upload_workflow(self, workflow: Union[UploadFile, str]) -> Dict[str, str]: |
848
|
|
|
""" Store a script for a workflow in the database |
849
|
|
|
""" |
850
|
|
|
if isinstance(workflow, str): |
851
|
|
|
with open(workflow) as wf_file: |
852
|
|
|
workflow_content = wf_file.read() |
853
|
|
|
else: |
854
|
|
|
workflow_content = await generate_workflow_content(workflow) |
855
|
|
|
validate_workflow(self.log, workflow_content) |
856
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
857
|
|
|
try: |
858
|
|
|
db_workflow_script = await db_find_first_workflow_script_by_content(content_hash) |
859
|
|
|
if db_workflow_script: |
860
|
|
|
message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}" |
861
|
|
|
raise_http_exception(self.log, status.HTTP_409_CONFLICT, message) |
862
|
|
|
except ValueError: |
863
|
|
|
pass |
864
|
|
|
workflow_id = generate_id() |
865
|
|
|
db_workflow_script = DBWorkflowScript( |
866
|
|
|
workflow_id=workflow_id, |
867
|
|
|
content=workflow_content, |
868
|
|
|
content_hash=content_hash |
869
|
|
|
) |
870
|
|
|
await db_workflow_script.insert() |
871
|
|
|
return {"workflow_id": workflow_id} |
872
|
|
|
|
873
|
|
|
async def replace_workflow(self, workflow_id, workflow: Union[UploadFile, str]) -> Dict[str, str]: |
874
|
|
|
""" Update a workflow script file in the database |
875
|
|
|
""" |
876
|
|
|
try: |
877
|
|
|
db_workflow_script = await db_get_workflow_script(workflow_id) |
878
|
|
|
if isinstance(workflow, str): |
879
|
|
|
with open(workflow) as wf_file: |
880
|
|
|
workflow_content = wf_file.read() |
881
|
|
|
else: |
882
|
|
|
workflow_content = await generate_workflow_content(workflow) |
883
|
|
|
validate_workflow(self.log, workflow_content) |
884
|
|
|
db_workflow_script.content = workflow_content |
885
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
886
|
|
|
db_workflow_script.content_hash = content_hash |
887
|
|
|
await db_workflow_script.save() |
888
|
|
|
return {"workflow_id": db_workflow_script.workflow_id} |
889
|
|
|
except ValueError as error: |
890
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
891
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
892
|
|
|
|
893
|
|
|
async def download_workflow(self, workflow_id) -> PlainTextResponse: |
894
|
|
|
""" Load workflow-script from the database |
895
|
|
|
""" |
896
|
|
|
try: |
897
|
|
|
workflow = await db_get_workflow_script(workflow_id) |
898
|
|
|
return PlainTextResponse(workflow.content) |
899
|
|
|
except ValueError as error: |
900
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
901
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
902
|
|
|
|