1
|
|
|
from datetime import datetime |
2
|
|
|
from os import getpid |
3
|
|
|
from pathlib import Path |
4
|
|
|
from typing import Dict, List, Optional, Union |
5
|
|
|
from uvicorn import run as uvicorn_run |
6
|
|
|
|
7
|
|
|
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile |
8
|
|
|
from fastapi.exceptions import RequestValidationError |
9
|
|
|
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse |
10
|
|
|
|
11
|
|
|
from ocrd.task_sequence import ProcessorTask |
12
|
|
|
from ocrd_utils import initLogging, getLogger |
13
|
|
|
from .constants import AgentType, JobState, ServerApiTags |
14
|
|
|
from .database import ( |
15
|
|
|
initiate_database, |
16
|
|
|
db_get_processing_job, |
17
|
|
|
db_get_processing_jobs, |
18
|
|
|
db_update_processing_job, |
19
|
|
|
db_update_workspace, |
20
|
|
|
db_get_workflow_script, |
21
|
|
|
db_find_first_workflow_script_by_content |
22
|
|
|
) |
23
|
|
|
from .runtime_data import Deployer |
24
|
|
|
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path |
25
|
|
|
from .models import ( |
26
|
|
|
DBProcessorJob, |
27
|
|
|
DBWorkflowJob, |
28
|
|
|
DBWorkflowScript, |
29
|
|
|
PYJobInput, |
30
|
|
|
PYJobOutput, |
31
|
|
|
PYResultMessage, |
32
|
|
|
PYWorkflowJobOutput |
33
|
|
|
) |
34
|
|
|
from .rabbitmq_utils import ( |
35
|
|
|
check_if_queue_exists, |
36
|
|
|
connect_rabbitmq_publisher, |
37
|
|
|
create_message_queues, |
38
|
|
|
OcrdProcessingMessage |
39
|
|
|
) |
40
|
|
|
from .server_cache import CacheLockedPages, CacheProcessingRequests |
41
|
|
|
from .server_utils import ( |
42
|
|
|
create_processing_message, |
43
|
|
|
create_workspace_if_not_exists, |
44
|
|
|
forward_job_to_processor_server, |
45
|
|
|
_get_processor_job, |
46
|
|
|
_get_processor_job_log, |
47
|
|
|
get_page_ids_list, |
48
|
|
|
get_workflow_content, |
49
|
|
|
get_from_database_workspace, |
50
|
|
|
get_from_database_workflow_job, |
51
|
|
|
kill_mets_server_zombies, |
52
|
|
|
parse_workflow_tasks, |
53
|
|
|
raise_http_exception, |
54
|
|
|
request_processor_server_tool_json, |
55
|
|
|
validate_and_return_mets_path, |
56
|
|
|
validate_first_task_input_file_groups_existence, |
57
|
|
|
validate_job_input, |
58
|
|
|
validate_workflow |
59
|
|
|
) |
60
|
|
|
from .tcp_to_uds_mets_proxy import MetsServerProxy |
61
|
|
|
from .utils import ( |
62
|
|
|
load_ocrd_all_tool_json, |
63
|
|
|
expand_page_ids, |
64
|
|
|
generate_id, |
65
|
|
|
generate_workflow_content, |
66
|
|
|
generate_workflow_content_hash |
67
|
|
|
) |
68
|
|
|
|
69
|
|
|
|
70
|
|
|
class ProcessingServer(FastAPI): |
71
|
|
|
"""FastAPI app to make ocr-d processor calls |
72
|
|
|
|
73
|
|
|
The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing |
74
|
|
|
part. It can run ocrd-processors and provides endpoints to discover processors and watch the job |
75
|
|
|
status. |
76
|
|
|
The Processing-Server does not execute the processors itself but starts up a queue and a |
77
|
|
|
database to delegate the calls to processing workers. They are started by the Processing-Server |
78
|
|
|
and the communication goes through the queue. |
79
|
|
|
""" |
80
|
|
|
|
81
|
|
|
def __init__(self, config_path: str, host: str, port: int) -> None: |
82
|
|
|
self.title = "OCR-D Processing Server" |
83
|
|
|
super().__init__( |
84
|
|
|
title=self.title, |
85
|
|
|
on_startup=[self.on_startup], |
86
|
|
|
on_shutdown=[self.on_shutdown], |
87
|
|
|
description="OCR-D Processing Server" |
88
|
|
|
) |
89
|
|
|
initLogging() |
90
|
|
|
self.log = getLogger("ocrd_network.processing_server") |
91
|
|
|
log_file = get_processing_server_logging_file_path(pid=getpid()) |
92
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
93
|
|
|
|
94
|
|
|
self.log.info(f"Loading ocrd all tool json") |
95
|
|
|
self.ocrd_all_tool_json = load_ocrd_all_tool_json() |
96
|
|
|
self.hostname = host |
97
|
|
|
self.port = port |
98
|
|
|
|
99
|
|
|
# The deployer is used for: |
100
|
|
|
# - deploying agents when the Processing Server is started |
101
|
|
|
# - retrieving runtime data of agents |
102
|
|
|
self.deployer = Deployer(config_path) |
103
|
|
|
# Used for forwarding Mets Server TCP requests to UDS requests |
104
|
|
|
self.mets_server_proxy = MetsServerProxy() |
105
|
|
|
self.use_tcp_mets = self.deployer.use_tcp_mets |
106
|
|
|
# If set, all Mets Server UDS requests are multiplexed over TCP |
107
|
|
|
# Used by processing workers and/or processor servers to report back the results |
108
|
|
|
if self.deployer.internal_callback_url: |
109
|
|
|
host = self.deployer.internal_callback_url |
110
|
|
|
self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback" |
111
|
|
|
self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets" |
112
|
|
|
else: |
113
|
|
|
self.internal_job_callback_url = f"http://{host}:{port}/result_callback" |
114
|
|
|
self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets" |
115
|
|
|
|
116
|
|
|
self.mongodb_url = None |
117
|
|
|
self.rabbitmq_url = None |
118
|
|
|
self.rmq_data = { |
119
|
|
|
"host": self.deployer.data_queue.host, |
120
|
|
|
"port": self.deployer.data_queue.port, |
121
|
|
|
"vhost": "/", |
122
|
|
|
"username": self.deployer.data_queue.cred_username, |
123
|
|
|
"password": self.deployer.data_queue.cred_password |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
# Gets assigned when `connect_rabbitmq_publisher()` is called on the working object |
127
|
|
|
self.rmq_publisher = None |
128
|
|
|
|
129
|
|
|
# Used for keeping track of cached processing requests |
130
|
|
|
self.cache_processing_requests = CacheProcessingRequests() |
131
|
|
|
|
132
|
|
|
# Used for keeping track of locked/unlocked pages of a workspace |
133
|
|
|
self.cache_locked_pages = CacheLockedPages() |
134
|
|
|
|
135
|
|
|
self.add_api_routes_others() |
136
|
|
|
self.add_api_routes_processing() |
137
|
|
|
self.add_api_routes_workflow() |
138
|
|
|
|
139
|
|
|
@self.exception_handler(RequestValidationError) |
140
|
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError): |
141
|
|
|
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') |
142
|
|
|
self.log.error(f'{request}: {exc_str}') |
143
|
|
|
content = {'status_code': 10422, 'message': exc_str, 'data': None} |
144
|
|
|
return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) |
145
|
|
|
|
146
|
|
|
def start(self) -> None: |
147
|
|
|
""" deploy agents (db, queue, workers) and start the processing server with uvicorn |
148
|
|
|
""" |
149
|
|
|
try: |
150
|
|
|
self.rabbitmq_url = self.deployer.deploy_rabbitmq() |
151
|
|
|
self.mongodb_url = self.deployer.deploy_mongodb() |
152
|
|
|
|
153
|
|
|
# The RMQPublisher is initialized and a connection to the RabbitMQ is performed |
154
|
|
|
self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) |
155
|
|
|
|
156
|
|
|
queue_names = self.deployer.find_matching_network_agents( |
157
|
|
|
worker_only=True, str_names_only=True, unique_only=True |
158
|
|
|
) |
159
|
|
|
self.log.info(f"Creating message queues on RabbitMQ instance url: {self.rabbitmq_url}") |
160
|
|
|
create_message_queues(logger=self.log, rmq_publisher=self.rmq_publisher, queue_names=queue_names) |
161
|
|
|
|
162
|
|
|
self.deployer.deploy_network_agents(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) |
163
|
|
|
except Exception as error: |
164
|
|
|
self.log.exception(f"Failed to start the Processing Server, error: {error}") |
165
|
|
|
self.log.warning("Trying to stop previously deployed services and network agents.") |
166
|
|
|
self.deployer.stop_all() |
167
|
|
|
raise |
168
|
|
|
uvicorn_run(self, host=self.hostname, port=int(self.port)) |
169
|
|
|
|
170
|
|
|
async def on_startup(self): |
171
|
|
|
self.log.info(f"Initializing the Database on: {self.mongodb_url}") |
172
|
|
|
await initiate_database(db_url=self.mongodb_url) |
173
|
|
|
|
174
|
|
|
async def on_shutdown(self) -> None: |
175
|
|
|
""" |
176
|
|
|
- hosts and pids should be stored somewhere |
177
|
|
|
- ensure queue is empty or processor is not currently running |
178
|
|
|
- connect to hosts and kill pids |
179
|
|
|
""" |
180
|
|
|
await self.stop_deployed_agents() |
181
|
|
|
|
182
|
|
|
def add_api_routes_others(self): |
183
|
|
|
others_router = APIRouter() |
184
|
|
|
others_router.add_api_route( |
185
|
|
|
path="/", |
186
|
|
|
endpoint=self.home_page, |
187
|
|
|
methods=["GET"], |
188
|
|
|
status_code=status.HTTP_200_OK, |
189
|
|
|
summary="Get information about the processing server" |
190
|
|
|
) |
191
|
|
|
others_router.add_api_route( |
192
|
|
|
path="/stop", |
193
|
|
|
endpoint=self.stop_deployed_agents, |
194
|
|
|
methods=["POST"], |
195
|
|
|
tags=[ServerApiTags.TOOLS], |
196
|
|
|
summary="Stop database, queue and processing-workers" |
197
|
|
|
) |
198
|
|
|
others_router.add_api_route( |
199
|
|
|
path="/tcp_mets", |
200
|
|
|
methods=["POST"], |
201
|
|
|
endpoint=self.forward_tcp_request_to_uds_mets_server, |
202
|
|
|
tags=[ServerApiTags.WORKSPACE], |
203
|
|
|
summary="Forward a TCP request to UDS mets server" |
204
|
|
|
) |
205
|
|
|
others_router.add_api_route( |
206
|
|
|
path="/kill_mets_server_zombies", |
207
|
|
|
endpoint=self.kill_mets_server_zombies, |
208
|
|
|
methods=["DELETE"], |
209
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
210
|
|
|
status_code=status.HTTP_200_OK, |
211
|
|
|
summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." |
212
|
|
|
) |
213
|
|
|
self.include_router(others_router) |
214
|
|
|
|
215
|
|
|
def add_api_routes_processing(self): |
216
|
|
|
processing_router = APIRouter() |
217
|
|
|
processing_router.add_api_route( |
218
|
|
|
path="/processor", |
219
|
|
|
endpoint=self.list_processors, |
220
|
|
|
methods=["GET"], |
221
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
222
|
|
|
status_code=status.HTTP_200_OK, |
223
|
|
|
summary="Get a list of all available processors" |
224
|
|
|
) |
225
|
|
|
processing_router.add_api_route( |
226
|
|
|
path="/processor/info/{processor_name}", |
227
|
|
|
endpoint=self.get_network_agent_ocrd_tool, |
228
|
|
|
methods=["GET"], |
229
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
230
|
|
|
status_code=status.HTTP_200_OK, |
231
|
|
|
summary="Get information about this processor" |
232
|
|
|
) |
233
|
|
|
processing_router.add_api_route( |
234
|
|
|
path="/processor/run/{processor_name}", |
235
|
|
|
endpoint=self.validate_and_forward_job_to_network_agent, |
236
|
|
|
methods=["POST"], |
237
|
|
|
tags=[ServerApiTags.PROCESSING], |
238
|
|
|
status_code=status.HTTP_200_OK, |
239
|
|
|
summary="Submit a job to this processor", |
240
|
|
|
response_model=PYJobOutput, |
241
|
|
|
response_model_exclude_unset=True, |
242
|
|
|
response_model_exclude_none=True |
243
|
|
|
) |
244
|
|
|
processing_router.add_api_route( |
245
|
|
|
path="/processor/job/{job_id}", |
246
|
|
|
endpoint=self.get_processor_job, |
247
|
|
|
methods=["GET"], |
248
|
|
|
tags=[ServerApiTags.PROCESSING], |
249
|
|
|
status_code=status.HTTP_200_OK, |
250
|
|
|
summary="Get information about a job based on its ID", |
251
|
|
|
response_model=PYJobOutput, |
252
|
|
|
response_model_exclude_unset=True, |
253
|
|
|
response_model_exclude_none=True |
254
|
|
|
) |
255
|
|
|
processing_router.add_api_route( |
256
|
|
|
path="/processor/log/{job_id}", |
257
|
|
|
endpoint=self.get_processor_job_log, |
258
|
|
|
methods=["GET"], |
259
|
|
|
tags=[ServerApiTags.PROCESSING], |
260
|
|
|
status_code=status.HTTP_200_OK, |
261
|
|
|
summary="Get the log file of a job id" |
262
|
|
|
) |
263
|
|
|
processing_router.add_api_route( |
264
|
|
|
path="/result_callback", |
265
|
|
|
endpoint=self.remove_job_from_request_cache, |
266
|
|
|
methods=["POST"], |
267
|
|
|
tags=[ServerApiTags.PROCESSING], |
268
|
|
|
status_code=status.HTTP_200_OK, |
269
|
|
|
summary="Callback used by a worker or processor server for reporting result of a processing request" |
270
|
|
|
) |
271
|
|
|
self.include_router(processing_router) |
272
|
|
|
|
273
|
|
|
def add_api_routes_workflow(self): |
274
|
|
|
workflow_router = APIRouter() |
275
|
|
|
workflow_router.add_api_route( |
276
|
|
|
path="/workflow", |
277
|
|
|
endpoint=self.upload_workflow, |
278
|
|
|
methods=["POST"], |
279
|
|
|
tags=[ServerApiTags.WORKFLOW], |
280
|
|
|
status_code=status.HTTP_201_CREATED, |
281
|
|
|
summary="Upload/Register a new workflow script" |
282
|
|
|
) |
283
|
|
|
workflow_router.add_api_route( |
284
|
|
|
path="/workflow/{workflow_id}", |
285
|
|
|
endpoint=self.download_workflow, |
286
|
|
|
methods=["GET"], |
287
|
|
|
tags=[ServerApiTags.WORKFLOW], |
288
|
|
|
status_code=status.HTTP_200_OK, |
289
|
|
|
summary="Download a workflow script" |
290
|
|
|
) |
291
|
|
|
workflow_router.add_api_route( |
292
|
|
|
path="/workflow/{workflow_id}", |
293
|
|
|
endpoint=self.replace_workflow, |
294
|
|
|
methods=["PUT"], |
295
|
|
|
tags=[ServerApiTags.WORKFLOW], |
296
|
|
|
status_code=status.HTTP_200_OK, |
297
|
|
|
summary="Update/Replace a workflow script" |
298
|
|
|
) |
299
|
|
|
workflow_router.add_api_route( |
300
|
|
|
path="/workflow/run", |
301
|
|
|
endpoint=self.run_workflow, |
302
|
|
|
methods=["POST"], |
303
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
304
|
|
|
status_code=status.HTTP_200_OK, |
305
|
|
|
summary="Run a workflow", |
306
|
|
|
response_model=PYWorkflowJobOutput, |
307
|
|
|
response_model_exclude_defaults=True, |
308
|
|
|
response_model_exclude_unset=True, |
309
|
|
|
response_model_exclude_none=True |
310
|
|
|
) |
311
|
|
|
workflow_router.add_api_route( |
312
|
|
|
path="/workflow/job-simple/{workflow_job_id}", |
313
|
|
|
endpoint=self.get_workflow_info_simple, |
314
|
|
|
methods=["GET"], |
315
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
316
|
|
|
status_code=status.HTTP_200_OK, |
317
|
|
|
summary="Get simplified overall job status" |
318
|
|
|
) |
319
|
|
|
workflow_router.add_api_route( |
320
|
|
|
path="/workflow/job/{workflow_job_id}", |
321
|
|
|
endpoint=self.get_workflow_info, |
322
|
|
|
methods=["GET"], |
323
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
324
|
|
|
status_code=status.HTTP_200_OK, |
325
|
|
|
summary="Get information about a workflow run" |
326
|
|
|
) |
327
|
|
|
self.include_router(workflow_router) |
328
|
|
|
|
329
|
|
|
async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict: |
330
|
|
|
"""Forward mets-server-request |
331
|
|
|
|
332
|
|
|
A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends |
333
|
|
|
a request to this endpoint. This request contains all information necessary to make a call |
334
|
|
|
to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call |
335
|
|
|
to the local (local for the processing-server) reachable the uds-mets-server. |
336
|
|
|
""" |
337
|
|
|
request_body = await request.json() |
338
|
|
|
ws_dir_path = request_body["workspace_path"] |
339
|
|
|
self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
340
|
|
|
return self.mets_server_proxy.forward_tcp_request(request_body=request_body) |
341
|
|
|
|
342
|
|
|
async def home_page(self): |
343
|
|
|
message = f"The home page of the {self.title}" |
344
|
|
|
json_message = { |
345
|
|
|
"message": message, |
346
|
|
|
"time": datetime.now().strftime("%Y-%m-%d %H:%M") |
347
|
|
|
} |
348
|
|
|
return json_message |
349
|
|
|
|
350
|
|
|
async def stop_deployed_agents(self) -> None: |
351
|
|
|
self.deployer.stop_all() |
352
|
|
|
|
353
|
|
|
def query_ocrd_tool_json_from_server(self, processor_name: str) -> Dict: |
354
|
|
|
processor_server_base_url = self.deployer.resolve_processor_server_url(processor_name) |
355
|
|
|
if processor_server_base_url == '': |
356
|
|
|
message = f"Processor Server URL of '{processor_name}' not found" |
357
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message=message) |
358
|
|
|
return request_processor_server_tool_json(self.log, processor_server_base_url=processor_server_base_url) |
359
|
|
|
|
360
|
|
|
async def get_network_agent_ocrd_tool( |
361
|
|
|
self, processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER |
362
|
|
|
) -> Dict: |
363
|
|
|
ocrd_tool = {} |
364
|
|
|
error_message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." |
365
|
|
|
if agent_type != AgentType.PROCESSING_WORKER and agent_type != AgentType.PROCESSOR_SERVER: |
366
|
|
|
message = f"Unknown agent type: {agent_type}, {type(agent_type)}" |
367
|
|
|
raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) |
368
|
|
|
if agent_type == AgentType.PROCESSING_WORKER: |
369
|
|
|
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) |
370
|
|
|
if agent_type == AgentType.PROCESSOR_SERVER: |
371
|
|
|
ocrd_tool = self.query_ocrd_tool_json_from_server(processor_name) |
372
|
|
|
if not ocrd_tool: |
373
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, error_message) |
374
|
|
|
return ocrd_tool |
375
|
|
|
|
376
|
|
|
def network_agent_exists_server(self, processor_name: str) -> bool: |
377
|
|
|
processor_server_url = self.deployer.resolve_processor_server_url(processor_name) |
378
|
|
|
return bool(processor_server_url) |
379
|
|
|
|
380
|
|
|
def network_agent_exists_worker(self, processor_name: str) -> bool: |
381
|
|
|
# TODO: Reconsider and refactor this. |
382
|
|
|
# Added ocrd-dummy by default if not available for the integration tests. |
383
|
|
|
# A proper Processing Worker / Processor Server registration endpoint |
384
|
|
|
# is needed on the Processing Server side |
385
|
|
|
if processor_name == 'ocrd-dummy': |
386
|
|
|
return True |
387
|
|
|
return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name)) |
388
|
|
|
|
389
|
|
|
def validate_agent_type_and_existence(self, processor_name: str, agent_type: AgentType) -> None: |
390
|
|
|
agent_exists = False |
391
|
|
|
if agent_type == AgentType.PROCESSOR_SERVER: |
392
|
|
|
agent_exists = self.network_agent_exists_server(processor_name=processor_name) |
393
|
|
|
elif agent_type == AgentType.PROCESSING_WORKER: |
394
|
|
|
agent_exists = self.network_agent_exists_worker(processor_name=processor_name) |
395
|
|
|
else: |
396
|
|
|
message = f"Unknown agent type: {agent_type}, {type(agent_type)}" |
397
|
|
|
raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) |
398
|
|
|
if not agent_exists: |
399
|
|
|
message = f"Network agent of type '{agent_type}' for processor '{processor_name}' not found." |
400
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
401
|
|
|
|
402
|
|
|
async def validate_and_forward_job_to_network_agent(self, processor_name: str, data: PYJobInput) -> PYJobOutput: |
403
|
|
|
# Append the processor name to the request itself |
404
|
|
|
data.processor_name = processor_name |
405
|
|
|
self.validate_agent_type_and_existence(processor_name=data.processor_name, agent_type=data.agent_type) |
406
|
|
|
if data.job_id: |
407
|
|
|
message = f"Processing request job id field is set but must not be: {data.job_id}" |
408
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
409
|
|
|
# Generate processing job id |
410
|
|
|
data.job_id = generate_id() |
411
|
|
|
ocrd_tool = await self.get_network_agent_ocrd_tool( |
412
|
|
|
processor_name=data.processor_name, |
413
|
|
|
agent_type=data.agent_type |
414
|
|
|
) |
415
|
|
|
validate_job_input(self.log, data.processor_name, ocrd_tool, data) |
416
|
|
|
|
417
|
|
|
if data.workspace_id: |
418
|
|
|
# just a check whether the workspace exists in the database or not |
419
|
|
|
await get_from_database_workspace(self.log, data.workspace_id) |
420
|
|
|
else: # data.path_to_mets provided instead |
421
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets) |
422
|
|
|
|
423
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
424
|
|
|
# initialize the request counter for the workspace_key |
425
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0) |
426
|
|
|
|
427
|
|
|
# This check is done to return early in case a workspace_id is provided |
428
|
|
|
# but the abs mets path cannot be queried from the DB |
429
|
|
|
request_mets_path = await validate_and_return_mets_path(self.log, data) |
430
|
|
|
|
431
|
|
|
page_ids = expand_page_ids(data.page_id) |
432
|
|
|
|
433
|
|
|
# A flag whether the current request must be cached |
434
|
|
|
# This is set to true if for any output file group there |
435
|
|
|
# is a page_id value that has been previously locked |
436
|
|
|
cache_current_request = False |
437
|
|
|
|
438
|
|
|
# Check if there are any dependencies of the current request |
439
|
|
|
if data.depends_on: |
440
|
|
|
cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on) |
441
|
|
|
|
442
|
|
|
# No need for further check of locked pages dependency |
443
|
|
|
# if the request should be already cached |
444
|
|
|
if not cache_current_request: |
445
|
|
|
# Check if there are any locked pages for the current request |
446
|
|
|
cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps( |
447
|
|
|
workspace_key=workspace_key, |
448
|
|
|
output_file_grps=data.output_file_grps, |
449
|
|
|
page_ids=page_ids |
450
|
|
|
) |
451
|
|
|
|
452
|
|
|
if cache_current_request: |
453
|
|
|
# Cache the received request |
454
|
|
|
self.cache_processing_requests.cache_request(workspace_key, data) |
455
|
|
|
|
456
|
|
|
# Create a cached job DB entry |
457
|
|
|
db_cached_job = DBProcessorJob( |
458
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
459
|
|
|
internal_callback_url=self.internal_job_callback_url, |
460
|
|
|
state=JobState.cached |
461
|
|
|
) |
462
|
|
|
await db_cached_job.insert() |
463
|
|
|
return db_cached_job.to_job_output() |
464
|
|
|
|
465
|
|
|
# Lock the pages in the request |
466
|
|
|
self.cache_locked_pages.lock_pages( |
467
|
|
|
workspace_key=workspace_key, |
468
|
|
|
output_file_grps=data.output_file_grps, |
469
|
|
|
page_ids=page_ids |
470
|
|
|
) |
471
|
|
|
|
472
|
|
|
# Start a UDS Mets Server with the current workspace |
473
|
|
|
ws_dir_path = str(Path(request_mets_path).parent) |
474
|
|
|
mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
475
|
|
|
if self.use_tcp_mets: |
476
|
|
|
# let workers talk to mets server via tcp instead of using unix-socket |
477
|
|
|
mets_server_url = self.multiplexing_endpoint |
478
|
|
|
|
479
|
|
|
# Assign the mets server url in the database (workers read mets_server_url from db) |
480
|
|
|
await db_update_workspace( |
481
|
|
|
workspace_id=data.workspace_id, |
482
|
|
|
workspace_mets_path=data.path_to_mets, |
483
|
|
|
mets_server_url=mets_server_url |
484
|
|
|
) |
485
|
|
|
|
486
|
|
|
# Create a queued job DB entry |
487
|
|
|
db_queued_job = DBProcessorJob( |
488
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
489
|
|
|
internal_callback_url=self.internal_job_callback_url, |
490
|
|
|
state=JobState.queued |
491
|
|
|
) |
492
|
|
|
await db_queued_job.insert() |
493
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
494
|
|
|
job_output = await self.push_job_to_network_agent(data=data, db_job=db_queued_job) |
495
|
|
|
return job_output |
496
|
|
|
|
497
|
|
|
async def push_job_to_network_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: |
498
|
|
|
if data.agent_type != AgentType.PROCESSING_WORKER and data.agent_type != AgentType.PROCESSOR_SERVER: |
499
|
|
|
message = f"Unknown agent type: {data.agent_type}, {type(data.agent_type)}" |
500
|
|
|
raise_http_exception(self.log, status_code=status.HTTP_501_NOT_IMPLEMENTED, message=message) |
501
|
|
|
job_output = None |
502
|
|
|
self.log.debug(f"Pushing to {data.agent_type}: {data.processor_name}, {data.page_id}, {data.job_id}") |
503
|
|
|
if data.agent_type == AgentType.PROCESSING_WORKER: |
504
|
|
|
job_output = await self.push_job_to_processing_queue(db_job=db_job) |
505
|
|
|
if data.agent_type == AgentType.PROCESSOR_SERVER: |
506
|
|
|
job_output = await self.push_job_to_processor_server(job_input=data) |
507
|
|
|
if not job_output: |
508
|
|
|
message = f"Failed to create job output for job input: {data}" |
509
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
510
|
|
|
return job_output |
511
|
|
|
|
512
|
|
|
async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput: |
513
|
|
|
if not self.rmq_publisher: |
514
|
|
|
message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected." |
515
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
516
|
|
|
processing_message = create_processing_message(self.log, db_job) |
517
|
|
|
try: |
518
|
|
|
encoded_message = OcrdProcessingMessage.encode_yml(processing_message) |
519
|
|
|
self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message) |
520
|
|
|
except Exception as error: |
521
|
|
|
message = ( |
522
|
|
|
f"Processing server has failed to push processing message to queue: {db_job.processor_name}, " |
523
|
|
|
f"Processing message: {processing_message.__dict__}" |
524
|
|
|
) |
525
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
526
|
|
|
return db_job.to_job_output() |
527
|
|
|
|
528
|
|
|
async def push_job_to_processor_server(self, job_input: PYJobInput) -> PYJobOutput: |
529
|
|
|
processor_server_base_url = self.deployer.resolve_processor_server_url(job_input.processor_name) |
530
|
|
|
return await forward_job_to_processor_server( |
531
|
|
|
self.log, job_input=job_input, processor_server_base_url=processor_server_base_url |
532
|
|
|
) |
533
|
|
|
|
534
|
|
|
async def get_processor_job(self, job_id: str) -> PYJobOutput: |
535
|
|
|
return await _get_processor_job(self.log, job_id) |
536
|
|
|
|
537
|
|
|
async def get_processor_job_log(self, job_id: str) -> FileResponse: |
538
|
|
|
return await _get_processor_job_log(self.log, job_id) |
539
|
|
|
|
540
|
|
|
async def _lock_pages_of_workspace( |
541
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
542
|
|
|
) -> None: |
543
|
|
|
# Lock the output file group pages for the current request |
544
|
|
|
self.cache_locked_pages.lock_pages( |
545
|
|
|
workspace_key=workspace_key, |
546
|
|
|
output_file_grps=output_file_grps, |
547
|
|
|
page_ids=page_ids |
548
|
|
|
) |
549
|
|
|
|
550
|
|
|
async def _unlock_pages_of_workspace( |
551
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
552
|
|
|
) -> None: |
553
|
|
|
self.cache_locked_pages.unlock_pages( |
554
|
|
|
workspace_key=workspace_key, |
555
|
|
|
output_file_grps=output_file_grps, |
556
|
|
|
page_ids=page_ids |
557
|
|
|
) |
558
|
|
|
|
559
|
|
|
async def push_cached_jobs_to_agents(self, processing_jobs: List[PYJobInput]) -> None: |
560
|
|
|
if not len(processing_jobs): |
561
|
|
|
self.log.debug("No processing jobs were consumed from the requests cache") |
562
|
|
|
return |
563
|
|
|
for data in processing_jobs: |
564
|
|
|
self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}") |
565
|
|
|
db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued) |
566
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
567
|
|
|
|
568
|
|
|
# Lock the output file group pages for the current request |
569
|
|
|
await self._lock_pages_of_workspace( |
570
|
|
|
workspace_key=workspace_key, |
571
|
|
|
output_file_grps=data.output_file_grps, |
572
|
|
|
page_ids=expand_page_ids(data.page_id) |
573
|
|
|
) |
574
|
|
|
|
575
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
576
|
|
|
job_output = await self.push_job_to_network_agent(data=data, db_job=db_consumed_job) |
577
|
|
|
if not job_output: |
578
|
|
|
self.log.exception(f"Failed to create job output for job input data: {data}") |
579
|
|
|
|
580
|
|
|
async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None: |
581
|
|
|
await self.cache_processing_requests.cancel_dependent_jobs( |
582
|
|
|
workspace_key=workspace_key, |
583
|
|
|
processing_job_id=job_id |
584
|
|
|
) |
585
|
|
|
|
586
|
|
|
async def _consume_cached_jobs_of_workspace( |
587
|
|
|
self, workspace_key: str, mets_server_url: str, path_to_mets: str |
588
|
|
|
) -> List[PYJobInput]: |
589
|
|
|
# decrease the internal cache counter by 1 |
590
|
|
|
request_counter = self.cache_processing_requests.update_request_counter( |
591
|
|
|
workspace_key=workspace_key, by_value=-1 |
592
|
|
|
) |
593
|
|
|
self.log.debug(f"Internal processing job cache counter value: {request_counter}") |
594
|
|
|
if (workspace_key not in self.cache_processing_requests.processing_requests or |
595
|
|
|
not len(self.cache_processing_requests.processing_requests[workspace_key])): |
596
|
|
|
if request_counter <= 0: |
597
|
|
|
# Shut down the Mets Server for the workspace_key since no |
598
|
|
|
# more internal callbacks are expected for that workspace |
599
|
|
|
self.log.debug(f"Stopping the mets server: {mets_server_url}") |
600
|
|
|
self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) |
601
|
|
|
|
602
|
|
|
try: |
603
|
|
|
# The queue is empty - delete it |
604
|
|
|
del self.cache_processing_requests.processing_requests[workspace_key] |
605
|
|
|
except KeyError: |
606
|
|
|
self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") |
607
|
|
|
|
608
|
|
|
# For debugging purposes it is good to see if any locked pages are left |
609
|
|
|
self.log.debug(f"Contents of the locked pages cache for: {workspace_key}") |
610
|
|
|
locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key) |
611
|
|
|
for output_file_grp in locked_pages: |
612
|
|
|
self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}") |
613
|
|
|
else: |
614
|
|
|
self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") |
615
|
|
|
return [] |
616
|
|
|
# Check whether the internal queue for the workspace key still exists |
617
|
|
|
if workspace_key not in self.cache_processing_requests.processing_requests: |
618
|
|
|
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") |
619
|
|
|
return [] |
620
|
|
|
consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) |
621
|
|
|
return consumed_requests |
622
|
|
|
|
623
|
|
|
async def remove_job_from_request_cache(self, result_message: PYResultMessage): |
624
|
|
|
result_job_id = result_message.job_id |
625
|
|
|
result_job_state = result_message.state |
626
|
|
|
path_to_mets = result_message.path_to_mets |
627
|
|
|
workspace_id = result_message.workspace_id |
628
|
|
|
self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}") |
629
|
|
|
|
630
|
|
|
db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets) |
631
|
|
|
mets_server_url = db_workspace.mets_server_url |
632
|
|
|
workspace_key = path_to_mets if path_to_mets else workspace_id |
633
|
|
|
|
634
|
|
|
if result_job_state == JobState.failed: |
635
|
|
|
await self._cancel_cached_dependent_jobs(workspace_key, result_job_id) |
636
|
|
|
|
637
|
|
|
if result_job_state != JobState.success: |
638
|
|
|
# TODO: Handle other potential error cases |
639
|
|
|
pass |
640
|
|
|
|
641
|
|
|
try: |
642
|
|
|
db_result_job = await db_get_processing_job(result_job_id) |
643
|
|
|
# Unlock the output file group pages for the result processing request |
644
|
|
|
await self._unlock_pages_of_workspace( |
645
|
|
|
workspace_key=workspace_key, |
646
|
|
|
output_file_grps=db_result_job.output_file_grps, |
647
|
|
|
page_ids=expand_page_ids(db_result_job.page_id) |
648
|
|
|
) |
649
|
|
|
except ValueError as error: |
650
|
|
|
message = f"Processing result job with id '{result_job_id}' not found in the DB." |
651
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
652
|
|
|
|
653
|
|
|
consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( |
654
|
|
|
workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets |
655
|
|
|
) |
656
|
|
|
await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) |
657
|
|
|
|
658
|
|
|
async def list_processors(self) -> List[str]: |
659
|
|
|
# There is no caching on the Processing Server side |
660
|
|
|
processor_names_list = self.deployer.find_matching_network_agents( |
661
|
|
|
docker_only=False, native_only=False, worker_only=False, server_only=False, |
662
|
|
|
str_names_only=True, unique_only=True, sort=True |
663
|
|
|
) |
664
|
|
|
return processor_names_list |
665
|
|
|
|
666
|
|
|
async def task_sequence_to_processing_jobs( |
667
|
|
|
self, |
668
|
|
|
tasks: List[ProcessorTask], |
669
|
|
|
mets_path: str, |
670
|
|
|
page_id: str, |
671
|
|
|
agent_type: AgentType = AgentType.PROCESSING_WORKER |
672
|
|
|
) -> List[PYJobOutput]: |
673
|
|
|
temp_file_group_cache = {} |
674
|
|
|
responses = [] |
675
|
|
|
for task in tasks: |
676
|
|
|
# Find dependent jobs of the current task |
677
|
|
|
dependent_jobs = [] |
678
|
|
|
for input_file_grp in task.input_file_grps: |
679
|
|
|
if input_file_grp in temp_file_group_cache: |
680
|
|
|
dependent_jobs.append(temp_file_group_cache[input_file_grp]) |
681
|
|
|
# NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level |
682
|
|
|
# Thus, setting these two flags in the ocrd process workflow file has no effect |
683
|
|
|
job_input_data = PYJobInput( |
684
|
|
|
processor_name=task.executable, |
685
|
|
|
path_to_mets=mets_path, |
686
|
|
|
input_file_grps=task.input_file_grps, |
687
|
|
|
output_file_grps=task.output_file_grps, |
688
|
|
|
page_id=page_id, |
689
|
|
|
parameters=task.parameters, |
690
|
|
|
agent_type=agent_type, |
691
|
|
|
depends_on=dependent_jobs, |
692
|
|
|
) |
693
|
|
|
response = await self.validate_and_forward_job_to_network_agent( |
694
|
|
|
processor_name=job_input_data.processor_name, |
695
|
|
|
data=job_input_data |
696
|
|
|
) |
697
|
|
|
for file_group in task.output_file_grps: |
698
|
|
|
temp_file_group_cache[file_group] = response.job_id |
699
|
|
|
responses.append(response) |
700
|
|
|
return responses |
701
|
|
|
|
702
|
|
|
def validate_tasks_agents_existence(self, tasks: List[ProcessorTask], agent_type: AgentType) -> None: |
703
|
|
|
missing_agents = [] |
704
|
|
|
for task in tasks: |
705
|
|
|
try: |
706
|
|
|
self.validate_agent_type_and_existence(processor_name=task.executable, agent_type=agent_type) |
707
|
|
|
except HTTPException: |
708
|
|
|
# catching the error is not relevant here |
709
|
|
|
missing_agents.append({task.executable, agent_type}) |
710
|
|
|
if missing_agents: |
711
|
|
|
message = ( |
712
|
|
|
"Workflow validation has failed. The desired network agents not found. " |
713
|
|
|
f"Missing processing agents: {missing_agents}" |
714
|
|
|
) |
715
|
|
|
raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message) |
716
|
|
|
|
717
|
|
|
async def run_workflow( |
718
|
|
|
self, |
719
|
|
|
mets_path: str, |
720
|
|
|
workflow: Union[UploadFile, None] = File(None), |
721
|
|
|
workflow_id: str = None, |
722
|
|
|
agent_type: AgentType = AgentType.PROCESSING_WORKER, |
723
|
|
|
page_id: str = None, |
724
|
|
|
page_wise: bool = False, |
725
|
|
|
workflow_callback_url: str = None |
726
|
|
|
) -> PYWorkflowJobOutput: |
727
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=mets_path) |
728
|
|
|
workflow_content = await get_workflow_content(self.log, workflow_id, workflow) |
729
|
|
|
processing_tasks = parse_workflow_tasks(self.log, workflow_content) |
730
|
|
|
|
731
|
|
|
# Validate the input file groups of the first task in the workflow |
732
|
|
|
validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps) |
733
|
|
|
|
734
|
|
|
# Validate existence of agents (processing workers/processor servers) |
735
|
|
|
# for the ocr-d processors referenced inside tasks |
736
|
|
|
self.validate_tasks_agents_existence(processing_tasks, agent_type) |
737
|
|
|
|
738
|
|
|
page_ids = get_page_ids_list(self.log, mets_path, page_id) |
739
|
|
|
|
740
|
|
|
# TODO: Reconsider this, the compact page range may not always work if the page_ids are hashes! |
741
|
|
|
compact_page_range = f"{page_ids[0]}..{page_ids[-1]}" |
742
|
|
|
|
743
|
|
|
if not page_wise: |
744
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
745
|
|
|
tasks=processing_tasks, |
746
|
|
|
mets_path=mets_path, |
747
|
|
|
page_id=compact_page_range, |
748
|
|
|
agent_type=agent_type |
749
|
|
|
) |
750
|
|
|
processing_job_ids = [response.job_id for response in responses] |
751
|
|
|
db_workflow_job = DBWorkflowJob( |
752
|
|
|
job_id=generate_id(), |
753
|
|
|
page_id=compact_page_range, |
754
|
|
|
page_wise=page_wise, |
755
|
|
|
processing_job_ids={compact_page_range: processing_job_ids}, |
756
|
|
|
path_to_mets=mets_path, |
757
|
|
|
workflow_callback_url=workflow_callback_url |
758
|
|
|
) |
759
|
|
|
await db_workflow_job.insert() |
760
|
|
|
return db_workflow_job.to_job_output() |
761
|
|
|
|
762
|
|
|
all_pages_job_ids = {} |
763
|
|
|
for current_page in page_ids: |
764
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
765
|
|
|
tasks=processing_tasks, |
766
|
|
|
mets_path=mets_path, |
767
|
|
|
page_id=current_page, |
768
|
|
|
agent_type=agent_type |
769
|
|
|
) |
770
|
|
|
processing_job_ids = [response.job_id for response in responses] |
771
|
|
|
all_pages_job_ids[current_page] = processing_job_ids |
772
|
|
|
db_workflow_job = DBWorkflowJob( |
773
|
|
|
job_id=generate_id(), |
774
|
|
|
page_id=compact_page_range, |
775
|
|
|
page_wise=page_wise, |
776
|
|
|
processing_job_ids=all_pages_job_ids, |
777
|
|
|
path_to_mets=mets_path, |
778
|
|
|
workflow_callback_url=workflow_callback_url |
779
|
|
|
) |
780
|
|
|
await db_workflow_job.insert() |
781
|
|
|
return db_workflow_job.to_job_output() |
782
|
|
|
|
783
|
|
|
@staticmethod |
784
|
|
|
def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict: |
785
|
|
|
response = {} |
786
|
|
|
failed_tasks = {} |
787
|
|
|
failed_tasks_key = "failed-processor-tasks" |
788
|
|
|
for p_job in processing_jobs: |
789
|
|
|
response.setdefault(p_job.processor_name, {}) |
790
|
|
|
response[p_job.processor_name].setdefault(p_job.state.value, 0) |
791
|
|
|
response[p_job.processor_name][p_job.state.value] += 1 |
792
|
|
|
if p_job.state == JobState.failed: |
793
|
|
|
if failed_tasks_key not in response: |
794
|
|
|
response[failed_tasks_key] = failed_tasks |
795
|
|
|
failed_tasks.setdefault(p_job.processor_name, []) |
796
|
|
|
failed_tasks[p_job.processor_name].append( |
797
|
|
|
{"job_id": p_job.job_id, "page_id": p_job.page_id} |
798
|
|
|
) |
799
|
|
|
return response |
800
|
|
|
|
801
|
|
|
@staticmethod |
802
|
|
|
def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState: |
803
|
|
|
workflow_job_state = JobState.unset |
804
|
|
|
success_jobs = 0 |
805
|
|
|
for p_job in processing_jobs: |
806
|
|
|
if p_job.state == JobState.cached or p_job.state == JobState.queued: |
807
|
|
|
continue |
808
|
|
|
if p_job.state == JobState.failed or p_job.state == JobState.cancelled: |
809
|
|
|
workflow_job_state = JobState.failed |
810
|
|
|
break |
811
|
|
|
if p_job.state == JobState.running: |
812
|
|
|
workflow_job_state = JobState.running |
813
|
|
|
if p_job.state == JobState.success: |
814
|
|
|
success_jobs += 1 |
815
|
|
|
if len(processing_jobs) == success_jobs: |
816
|
|
|
workflow_job_state = JobState.success |
817
|
|
|
return workflow_job_state |
818
|
|
|
|
819
|
|
|
async def get_workflow_info(self, workflow_job_id) -> Dict: |
820
|
|
|
""" Return list of a workflow's processor jobs |
821
|
|
|
""" |
822
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
823
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
824
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
825
|
|
|
response = self._produce_workflow_status_response(processing_jobs=jobs) |
826
|
|
|
return response |
827
|
|
|
|
828
|
|
|
async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]: |
829
|
|
|
pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) |
830
|
|
|
return pids_killed |
831
|
|
|
|
832
|
|
|
async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: |
833
|
|
|
""" |
834
|
|
|
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. |
835
|
|
|
- If a single processing job fails, the entire workflow job status is set to FAILED. |
836
|
|
|
- If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED, |
837
|
|
|
the entire workflow job status is set to RUNNING. |
838
|
|
|
- If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS |
839
|
|
|
""" |
840
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
841
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
842
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
843
|
|
|
workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs) |
844
|
|
|
return {"state": workflow_job_state} |
845
|
|
|
|
846
|
|
|
async def upload_workflow(self, workflow: UploadFile) -> Dict[str, str]: |
847
|
|
|
""" Store a script for a workflow in the database |
848
|
|
|
""" |
849
|
|
|
workflow_content = await generate_workflow_content(workflow) |
850
|
|
|
validate_workflow(self.log, workflow_content) |
851
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
852
|
|
|
try: |
853
|
|
|
db_workflow_script = await db_find_first_workflow_script_by_content(content_hash) |
854
|
|
|
if db_workflow_script: |
855
|
|
|
message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}" |
856
|
|
|
raise_http_exception(self.log, status.HTTP_409_CONFLICT, message) |
857
|
|
|
except ValueError: |
858
|
|
|
pass |
859
|
|
|
workflow_id = generate_id() |
860
|
|
|
db_workflow_script = DBWorkflowScript( |
861
|
|
|
workflow_id=workflow_id, |
862
|
|
|
content=workflow_content, |
863
|
|
|
content_hash=content_hash |
864
|
|
|
) |
865
|
|
|
await db_workflow_script.insert() |
866
|
|
|
return {"workflow_id": workflow_id} |
867
|
|
|
|
868
|
|
|
async def replace_workflow(self, workflow_id, workflow: UploadFile) -> Dict[str, str]: |
869
|
|
|
""" Update a workflow script file in the database |
870
|
|
|
""" |
871
|
|
|
try: |
872
|
|
|
db_workflow_script = await db_get_workflow_script(workflow_id) |
873
|
|
|
workflow_content = await generate_workflow_content(workflow) |
874
|
|
|
validate_workflow(self.log, workflow_content) |
875
|
|
|
db_workflow_script.content = workflow_content |
876
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
877
|
|
|
db_workflow_script.content_hash = content_hash |
878
|
|
|
await db_workflow_script.save() |
879
|
|
|
return {"workflow_id": db_workflow_script.workflow_id} |
880
|
|
|
except ValueError as error: |
881
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
882
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
883
|
|
|
|
884
|
|
|
async def download_workflow(self, workflow_id) -> PlainTextResponse: |
885
|
|
|
""" Load workflow-script from the database |
886
|
|
|
""" |
887
|
|
|
try: |
888
|
|
|
workflow = await db_get_workflow_script(workflow_id) |
889
|
|
|
return PlainTextResponse(workflow.content) |
890
|
|
|
except ValueError as error: |
891
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
892
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
893
|
|
|
|