|
1
|
|
|
from datetime import datetime |
|
2
|
|
|
from os import getpid |
|
3
|
|
|
from pathlib import Path |
|
4
|
|
|
from typing import Dict, List, Optional, Union |
|
5
|
|
|
from uvicorn import run as uvicorn_run |
|
6
|
|
|
|
|
7
|
|
|
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile |
|
8
|
|
|
from fastapi.exceptions import RequestValidationError |
|
9
|
|
|
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse |
|
10
|
|
|
|
|
11
|
|
|
from ocrd.task_sequence import ProcessorTask |
|
12
|
|
|
from ocrd_utils import initLogging, getLogger |
|
13
|
|
|
from .constants import JobState, ServerApiTags |
|
14
|
|
|
from .database import ( |
|
15
|
|
|
initiate_database, |
|
16
|
|
|
db_get_processing_job, |
|
17
|
|
|
db_get_processing_jobs, |
|
18
|
|
|
db_update_processing_job, |
|
19
|
|
|
db_update_workspace, |
|
20
|
|
|
db_get_workflow_script, |
|
21
|
|
|
db_find_first_workflow_script_by_content |
|
22
|
|
|
) |
|
23
|
|
|
from .runtime_data import Deployer |
|
24
|
|
|
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path |
|
25
|
|
|
from .models import ( |
|
26
|
|
|
DBProcessorJob, |
|
27
|
|
|
DBWorkflowJob, |
|
28
|
|
|
DBWorkflowScript, |
|
29
|
|
|
PYJobInput, |
|
30
|
|
|
PYJobOutput, |
|
31
|
|
|
PYResultMessage, |
|
32
|
|
|
PYWorkflowJobOutput |
|
33
|
|
|
) |
|
34
|
|
|
from .rabbitmq_utils import ( |
|
35
|
|
|
check_if_queue_exists, |
|
36
|
|
|
connect_rabbitmq_publisher, |
|
37
|
|
|
get_message_queues, |
|
38
|
|
|
OcrdProcessingMessage |
|
39
|
|
|
) |
|
40
|
|
|
from .server_cache import CacheLockedPages, CacheProcessingRequests |
|
41
|
|
|
from .server_utils import ( |
|
42
|
|
|
create_processing_message, |
|
43
|
|
|
create_workspace_if_not_exists, |
|
44
|
|
|
_get_processor_job, |
|
45
|
|
|
_get_processor_job_log, |
|
46
|
|
|
get_page_ids_list, |
|
47
|
|
|
get_workflow_content, |
|
48
|
|
|
get_from_database_workspace, |
|
49
|
|
|
get_from_database_workflow_job, |
|
50
|
|
|
kill_mets_server_zombies, |
|
51
|
|
|
parse_workflow_tasks, |
|
52
|
|
|
raise_http_exception, |
|
53
|
|
|
validate_and_return_mets_path, |
|
54
|
|
|
validate_first_task_input_file_groups_existence, |
|
55
|
|
|
validate_job_input, |
|
56
|
|
|
validate_workflow |
|
57
|
|
|
) |
|
58
|
|
|
from .tcp_to_uds_mets_proxy import MetsServerProxy |
|
59
|
|
|
from .utils import ( |
|
60
|
|
|
load_ocrd_all_tool_json, |
|
61
|
|
|
expand_page_ids, |
|
62
|
|
|
generate_id, |
|
63
|
|
|
generate_workflow_content, |
|
64
|
|
|
generate_workflow_content_hash |
|
65
|
|
|
) |
|
66
|
|
|
|
|
67
|
|
|
|
|
68
|
|
|
class ProcessingServer(FastAPI): |
|
69
|
|
|
"""FastAPI app to make ocr-d processor calls |
|
70
|
|
|
|
|
71
|
|
|
The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing |
|
72
|
|
|
part. It can run ocrd-processors and provides endpoints to discover processors and watch the job |
|
73
|
|
|
status. |
|
74
|
|
|
The Processing-Server does not execute the processors itself but starts up a queue and a |
|
75
|
|
|
database to delegate the calls to processing workers. They are started by the Processing-Server |
|
76
|
|
|
and the communication goes through the queue. |
|
77
|
|
|
""" |
|
78
|
|
|
|
|
79
|
|
|
def __init__(self, config_path: str, host: str, port: int) -> None: |
|
80
|
|
|
self.title = "OCR-D Processing Server" |
|
81
|
|
|
super().__init__( |
|
82
|
|
|
title=self.title, |
|
83
|
|
|
on_startup=[self.on_startup], |
|
84
|
|
|
on_shutdown=[self.on_shutdown], |
|
85
|
|
|
description="OCR-D Processing Server" |
|
86
|
|
|
) |
|
87
|
|
|
initLogging() |
|
88
|
|
|
self.log = getLogger("ocrd_network.processing_server") |
|
89
|
|
|
log_file = get_processing_server_logging_file_path(pid=getpid()) |
|
90
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
|
91
|
|
|
|
|
92
|
|
|
self.log.info("Loading ocrd-all-tool.json") |
|
93
|
|
|
self.ocrd_all_tool_json = load_ocrd_all_tool_json() |
|
94
|
|
|
self.hostname = host |
|
95
|
|
|
self.port = port |
|
96
|
|
|
|
|
97
|
|
|
# The deployer is used for: |
|
98
|
|
|
# - deploying agents when the Processing Server is started |
|
99
|
|
|
# - retrieving runtime data of agents |
|
100
|
|
|
self.deployer = Deployer(config_path) |
|
101
|
|
|
# Used for forwarding Mets Server TCP requests to UDS requests |
|
102
|
|
|
self.mets_server_proxy = MetsServerProxy() |
|
103
|
|
|
self.use_tcp_mets = self.deployer.use_tcp_mets |
|
104
|
|
|
# If set, all Mets Server UDS requests are multiplexed over TCP |
|
105
|
|
|
# Used by processing workers to report back the results |
|
106
|
|
|
if self.deployer.internal_callback_url: |
|
107
|
|
|
host = self.deployer.internal_callback_url |
|
108
|
|
|
self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback" |
|
109
|
|
|
self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets" |
|
110
|
|
|
else: |
|
111
|
|
|
self.internal_job_callback_url = f"http://{host}:{port}/result_callback" |
|
112
|
|
|
self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets" |
|
113
|
|
|
|
|
114
|
|
|
self.mongodb_url = None |
|
115
|
|
|
self.rabbitmq_url = None |
|
116
|
|
|
self.rmq_data = { |
|
117
|
|
|
"host": self.deployer.data_queue.host, |
|
118
|
|
|
"port": self.deployer.data_queue.port, |
|
119
|
|
|
"vhost": "/", |
|
120
|
|
|
"username": self.deployer.data_queue.cred_username, |
|
121
|
|
|
"password": self.deployer.data_queue.cred_password |
|
122
|
|
|
} |
|
123
|
|
|
|
|
124
|
|
|
# Gets assigned when `connect_rabbitmq_publisher()` is called on the working object |
|
125
|
|
|
self.rmq_publisher = None |
|
126
|
|
|
|
|
127
|
|
|
# Used for keeping track of cached processing requests |
|
128
|
|
|
self.cache_processing_requests = CacheProcessingRequests() |
|
129
|
|
|
|
|
130
|
|
|
# Used for keeping track of locked/unlocked pages of a workspace |
|
131
|
|
|
self.cache_locked_pages = CacheLockedPages() |
|
132
|
|
|
|
|
133
|
|
|
self.add_api_routes_others() |
|
134
|
|
|
self.add_api_routes_processing() |
|
135
|
|
|
self.add_api_routes_workflow() |
|
136
|
|
|
|
|
137
|
|
|
@self.exception_handler(RequestValidationError) |
|
138
|
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError): |
|
139
|
|
|
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') |
|
140
|
|
|
self.log.error(f'{request}: {exc_str}') |
|
141
|
|
|
content = {'status_code': 10422, 'message': exc_str, 'data': None} |
|
142
|
|
|
return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) |
|
143
|
|
|
|
|
144
|
|
|
def start(self) -> None: |
|
145
|
|
|
""" deploy agents (db, queue, workers) and start the processing server with uvicorn |
|
146
|
|
|
""" |
|
147
|
|
|
try: |
|
148
|
|
|
self.rabbitmq_url = self.deployer.deploy_rabbitmq() |
|
149
|
|
|
self.mongodb_url = self.deployer.deploy_mongodb() |
|
150
|
|
|
|
|
151
|
|
|
# The RMQPublisher is initialized and a connection to the RabbitMQ is performed |
|
152
|
|
|
self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) |
|
153
|
|
|
|
|
154
|
|
|
self.deployer.deploy_workers(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) |
|
155
|
|
|
except Exception as error: |
|
156
|
|
|
self.log.exception(f"Failed to start the Processing Server, error: {error}") |
|
157
|
|
|
self.log.warning("Trying to stop previously deployed services and workers.") |
|
158
|
|
|
self.deployer.stop_all() |
|
159
|
|
|
raise |
|
160
|
|
|
uvicorn_run(self, host=self.hostname, port=int(self.port)) |
|
161
|
|
|
|
|
162
|
|
|
async def on_startup(self): |
|
163
|
|
|
self.log.info(f"Initializing the Database on: {self.mongodb_url}") |
|
164
|
|
|
await initiate_database(db_url=self.mongodb_url) |
|
165
|
|
|
|
|
166
|
|
|
async def on_shutdown(self) -> None: |
|
167
|
|
|
""" |
|
168
|
|
|
- hosts and pids should be stored somewhere |
|
169
|
|
|
- ensure queue is empty or processor is not currently running |
|
170
|
|
|
- connect to hosts and kill pids |
|
171
|
|
|
""" |
|
172
|
|
|
await self.stop_deployed_agents() |
|
173
|
|
|
|
|
174
|
|
|
def add_api_routes_others(self): |
|
175
|
|
|
others_router = APIRouter() |
|
176
|
|
|
others_router.add_api_route( |
|
177
|
|
|
path="/", |
|
178
|
|
|
endpoint=self.home_page, |
|
179
|
|
|
methods=["GET"], |
|
180
|
|
|
status_code=status.HTTP_200_OK, |
|
181
|
|
|
summary="Get information about the processing server" |
|
182
|
|
|
) |
|
183
|
|
|
others_router.add_api_route( |
|
184
|
|
|
path="/stop", |
|
185
|
|
|
endpoint=self.stop_deployed_agents, |
|
186
|
|
|
methods=["POST"], |
|
187
|
|
|
tags=[ServerApiTags.TOOLS], |
|
188
|
|
|
summary="Stop database, queue and processing-workers" |
|
189
|
|
|
) |
|
190
|
|
|
others_router.add_api_route( |
|
191
|
|
|
path="/tcp_mets", |
|
192
|
|
|
methods=["POST"], |
|
193
|
|
|
endpoint=self.forward_tcp_request_to_uds_mets_server, |
|
194
|
|
|
tags=[ServerApiTags.WORKSPACE], |
|
195
|
|
|
summary="Forward a TCP request to UDS mets server" |
|
196
|
|
|
) |
|
197
|
|
|
others_router.add_api_route( |
|
198
|
|
|
path="/kill_mets_server_zombies", |
|
199
|
|
|
endpoint=self.kill_mets_server_zombies, |
|
200
|
|
|
methods=["DELETE"], |
|
201
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
|
202
|
|
|
status_code=status.HTTP_200_OK, |
|
203
|
|
|
summary="!! Workaround Do Not Use Unless You Have A Reason " |
|
204
|
|
|
"!! Kill all METS servers on this machine that have been created more than 60 minutes ago." |
|
205
|
|
|
) |
|
206
|
|
|
self.include_router(others_router) |
|
207
|
|
|
|
|
208
|
|
|
def add_api_routes_processing(self): |
|
209
|
|
|
processing_router = APIRouter() |
|
210
|
|
|
processing_router.add_api_route( |
|
211
|
|
|
path="/processor", |
|
212
|
|
|
endpoint=self.list_processors, |
|
213
|
|
|
methods=["GET"], |
|
214
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
|
215
|
|
|
status_code=status.HTTP_200_OK, |
|
216
|
|
|
summary="Get a list of all available processors" |
|
217
|
|
|
) |
|
218
|
|
|
processing_router.add_api_route( |
|
219
|
|
|
path="/processor/info/{processor_name}", |
|
220
|
|
|
endpoint=self.get_worker_ocrd_tool, |
|
221
|
|
|
methods=["GET"], |
|
222
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
|
223
|
|
|
status_code=status.HTTP_200_OK, |
|
224
|
|
|
summary="Get information about this processor" |
|
225
|
|
|
) |
|
226
|
|
|
processing_router.add_api_route( |
|
227
|
|
|
path="/processor/run/{processor_name}", |
|
228
|
|
|
endpoint=self.validate_and_forward_job_to_worker, |
|
229
|
|
|
methods=["POST"], |
|
230
|
|
|
tags=[ServerApiTags.PROCESSING], |
|
231
|
|
|
status_code=status.HTTP_200_OK, |
|
232
|
|
|
summary="Submit a job to this processor", |
|
233
|
|
|
response_model=PYJobOutput, |
|
234
|
|
|
response_model_exclude_unset=True, |
|
235
|
|
|
response_model_exclude_none=True |
|
236
|
|
|
) |
|
237
|
|
|
processing_router.add_api_route( |
|
238
|
|
|
path="/processor/job/{job_id}", |
|
239
|
|
|
endpoint=self.get_processor_job, |
|
240
|
|
|
methods=["GET"], |
|
241
|
|
|
tags=[ServerApiTags.PROCESSING], |
|
242
|
|
|
status_code=status.HTTP_200_OK, |
|
243
|
|
|
summary="Get information about a job based on its ID", |
|
244
|
|
|
response_model=PYJobOutput, |
|
245
|
|
|
response_model_exclude_unset=True, |
|
246
|
|
|
response_model_exclude_none=True |
|
247
|
|
|
) |
|
248
|
|
|
processing_router.add_api_route( |
|
249
|
|
|
path="/processor/log/{job_id}", |
|
250
|
|
|
endpoint=self.get_processor_job_log, |
|
251
|
|
|
methods=["GET"], |
|
252
|
|
|
tags=[ServerApiTags.PROCESSING], |
|
253
|
|
|
status_code=status.HTTP_200_OK, |
|
254
|
|
|
summary="Get the log file of a job id" |
|
255
|
|
|
) |
|
256
|
|
|
processing_router.add_api_route( |
|
257
|
|
|
path="/result_callback", |
|
258
|
|
|
endpoint=self.remove_job_from_request_cache, |
|
259
|
|
|
methods=["POST"], |
|
260
|
|
|
tags=[ServerApiTags.PROCESSING], |
|
261
|
|
|
status_code=status.HTTP_200_OK, |
|
262
|
|
|
summary="Callback used by a worker for reporting result of a processing request" |
|
263
|
|
|
) |
|
264
|
|
|
self.include_router(processing_router) |
|
265
|
|
|
|
|
266
|
|
|
def add_api_routes_workflow(self): |
|
267
|
|
|
workflow_router = APIRouter() |
|
268
|
|
|
workflow_router.add_api_route( |
|
269
|
|
|
path="/workflow", |
|
270
|
|
|
endpoint=self.upload_workflow, |
|
271
|
|
|
methods=["POST"], |
|
272
|
|
|
tags=[ServerApiTags.WORKFLOW], |
|
273
|
|
|
status_code=status.HTTP_201_CREATED, |
|
274
|
|
|
summary="Upload/Register a new workflow script" |
|
275
|
|
|
) |
|
276
|
|
|
workflow_router.add_api_route( |
|
277
|
|
|
path="/workflow/{workflow_id}", |
|
278
|
|
|
endpoint=self.download_workflow, |
|
279
|
|
|
methods=["GET"], |
|
280
|
|
|
tags=[ServerApiTags.WORKFLOW], |
|
281
|
|
|
status_code=status.HTTP_200_OK, |
|
282
|
|
|
summary="Download a workflow script" |
|
283
|
|
|
) |
|
284
|
|
|
workflow_router.add_api_route( |
|
285
|
|
|
path="/workflow/{workflow_id}", |
|
286
|
|
|
endpoint=self.replace_workflow, |
|
287
|
|
|
methods=["PUT"], |
|
288
|
|
|
tags=[ServerApiTags.WORKFLOW], |
|
289
|
|
|
status_code=status.HTTP_200_OK, |
|
290
|
|
|
summary="Update/Replace a workflow script" |
|
291
|
|
|
) |
|
292
|
|
|
workflow_router.add_api_route( |
|
293
|
|
|
path="/workflow/run", |
|
294
|
|
|
endpoint=self.run_workflow, |
|
295
|
|
|
methods=["POST"], |
|
296
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
|
297
|
|
|
status_code=status.HTTP_200_OK, |
|
298
|
|
|
summary="Run a workflow", |
|
299
|
|
|
response_model=PYWorkflowJobOutput, |
|
300
|
|
|
response_model_exclude_defaults=True, |
|
301
|
|
|
response_model_exclude_unset=True, |
|
302
|
|
|
response_model_exclude_none=True |
|
303
|
|
|
) |
|
304
|
|
|
workflow_router.add_api_route( |
|
305
|
|
|
path="/workflow/job-simple/{workflow_job_id}", |
|
306
|
|
|
endpoint=self.get_workflow_info_simple, |
|
307
|
|
|
methods=["GET"], |
|
308
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
|
309
|
|
|
status_code=status.HTTP_200_OK, |
|
310
|
|
|
summary="Get simplified overall job status" |
|
311
|
|
|
) |
|
312
|
|
|
workflow_router.add_api_route( |
|
313
|
|
|
path="/workflow/job/{workflow_job_id}", |
|
314
|
|
|
endpoint=self.get_workflow_info, |
|
315
|
|
|
methods=["GET"], |
|
316
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
|
317
|
|
|
status_code=status.HTTP_200_OK, |
|
318
|
|
|
summary="Get information about a workflow run" |
|
319
|
|
|
) |
|
320
|
|
|
self.include_router(workflow_router) |
|
321
|
|
|
|
|
322
|
|
|
async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict: |
|
323
|
|
|
"""Forward mets-server-request |
|
324
|
|
|
|
|
325
|
|
|
A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends |
|
326
|
|
|
a request to this endpoint. This request contains all information necessary to make a call |
|
327
|
|
|
to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call |
|
328
|
|
|
to the local (local for the processing-server) reachable the uds-mets-server. |
|
329
|
|
|
""" |
|
330
|
|
|
request_body = await request.json() |
|
331
|
|
|
ws_dir_path = request_body["workspace_path"] |
|
332
|
|
|
self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
|
333
|
|
|
return self.mets_server_proxy.forward_tcp_request(request_body=request_body) |
|
334
|
|
|
|
|
335
|
|
|
async def home_page(self): |
|
336
|
|
|
message = f"The home page of the {self.title}" |
|
337
|
|
|
json_message = { |
|
338
|
|
|
"message": message, |
|
339
|
|
|
"time": datetime.now().strftime("%Y-%m-%d %H:%M") |
|
340
|
|
|
} |
|
341
|
|
|
return json_message |
|
342
|
|
|
|
|
343
|
|
|
async def stop_deployed_agents(self) -> None: |
|
344
|
|
|
self.deployer.stop_all() |
|
345
|
|
|
|
|
346
|
|
|
async def get_worker_ocrd_tool(self, processor_name: str) -> Dict: |
|
347
|
|
|
ocrd_tool = {} |
|
348
|
|
|
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) |
|
349
|
|
|
if not ocrd_tool: |
|
350
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, |
|
351
|
|
|
f"Processing Worker '{processor_name}' not found.") |
|
352
|
|
|
return ocrd_tool |
|
353
|
|
|
|
|
354
|
|
|
def exists_worker(self, processor_name: str) -> bool: |
|
355
|
|
|
# TODO: Reconsider and refactor this. |
|
356
|
|
|
# Added ocrd-dummy by default if not available for the integration tests. |
|
357
|
|
|
# A proper Processing Worker registration endpoint is needed on the Processing Server side |
|
358
|
|
|
if processor_name == 'ocrd-dummy': |
|
359
|
|
|
return True |
|
360
|
|
|
return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name)) |
|
361
|
|
|
|
|
362
|
|
|
def validate_worker_existence(self, processor_name: str) -> None: |
|
363
|
|
|
worker_exists = self.exists_worker(processor_name=processor_name) |
|
364
|
|
|
if not worker_exists: |
|
365
|
|
|
message = f"Processing Worker '{processor_name}' not found." |
|
366
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
367
|
|
|
|
|
368
|
|
|
async def validate_and_forward_job_to_worker(self, processor_name: str, data: PYJobInput) -> PYJobOutput: |
|
369
|
|
|
# Append the processor name to the request itself |
|
370
|
|
|
data.processor_name = processor_name |
|
371
|
|
|
self.validate_worker_existence(processor_name=data.processor_name) |
|
372
|
|
|
if data.job_id: |
|
373
|
|
|
message = f"Processing request job id field is set but must not be: {data.job_id}" |
|
374
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
375
|
|
|
# Generate processing job id |
|
376
|
|
|
data.job_id = generate_id() |
|
377
|
|
|
ocrd_tool = await self.get_worker_ocrd_tool(processor_name=data.processor_name) |
|
378
|
|
|
validate_job_input(self.log, data.processor_name, ocrd_tool, data) |
|
379
|
|
|
|
|
380
|
|
|
if data.workspace_id: |
|
381
|
|
|
# just a check whether the workspace exists in the database or not |
|
382
|
|
|
await get_from_database_workspace(self.log, data.workspace_id) |
|
383
|
|
|
else: # data.path_to_mets provided instead |
|
384
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets) |
|
385
|
|
|
|
|
386
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
|
387
|
|
|
# initialize the request counter for the workspace_key |
|
388
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0) |
|
389
|
|
|
|
|
390
|
|
|
# This check is done to return early in case a workspace_id is provided |
|
391
|
|
|
# but the abs mets path cannot be queried from the DB |
|
392
|
|
|
request_mets_path = await validate_and_return_mets_path(self.log, data) |
|
393
|
|
|
|
|
394
|
|
|
page_ids = expand_page_ids(data.page_id) |
|
395
|
|
|
|
|
396
|
|
|
# A flag whether the current request must be cached |
|
397
|
|
|
# This is set to true if for any output file group there |
|
398
|
|
|
# is a page_id value that has been previously locked |
|
399
|
|
|
cache_current_request = False |
|
400
|
|
|
|
|
401
|
|
|
# Check if there are any dependencies of the current request |
|
402
|
|
|
if data.depends_on: |
|
403
|
|
|
cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on) |
|
404
|
|
|
|
|
405
|
|
|
# No need for further check of locked pages dependency |
|
406
|
|
|
# if the request should be already cached |
|
407
|
|
|
if not cache_current_request: |
|
408
|
|
|
# Check if there are any locked pages for the current request |
|
409
|
|
|
cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps( |
|
410
|
|
|
workspace_key=workspace_key, |
|
411
|
|
|
output_file_grps=data.output_file_grps, |
|
412
|
|
|
page_ids=page_ids |
|
413
|
|
|
) |
|
414
|
|
|
|
|
415
|
|
|
if cache_current_request: |
|
416
|
|
|
# Cache the received request |
|
417
|
|
|
self.cache_processing_requests.cache_request(workspace_key, data) |
|
418
|
|
|
|
|
419
|
|
|
# Create a cached job DB entry |
|
420
|
|
|
db_cached_job = DBProcessorJob( |
|
421
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
|
422
|
|
|
internal_callback_url=self.internal_job_callback_url, |
|
423
|
|
|
state=JobState.cached |
|
424
|
|
|
) |
|
425
|
|
|
await db_cached_job.insert() |
|
426
|
|
|
return db_cached_job.to_job_output() |
|
427
|
|
|
|
|
428
|
|
|
# Lock the pages in the request |
|
429
|
|
|
self.cache_locked_pages.lock_pages( |
|
430
|
|
|
workspace_key=workspace_key, |
|
431
|
|
|
output_file_grps=data.output_file_grps, |
|
432
|
|
|
page_ids=page_ids |
|
433
|
|
|
) |
|
434
|
|
|
|
|
435
|
|
|
# Start a UDS Mets Server with the current workspace |
|
436
|
|
|
ws_dir_path = str(Path(request_mets_path).parent) |
|
437
|
|
|
mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
|
438
|
|
|
if self.use_tcp_mets: |
|
439
|
|
|
# let workers talk to mets server via tcp instead of using unix-socket |
|
440
|
|
|
mets_server_url = self.multiplexing_endpoint |
|
441
|
|
|
|
|
442
|
|
|
# Assign the mets server url in the database (workers read mets_server_url from db) |
|
443
|
|
|
await db_update_workspace( |
|
444
|
|
|
workspace_id=data.workspace_id, |
|
445
|
|
|
workspace_mets_path=data.path_to_mets, |
|
446
|
|
|
mets_server_url=mets_server_url |
|
447
|
|
|
) |
|
448
|
|
|
|
|
449
|
|
|
# Create a queued job DB entry |
|
450
|
|
|
db_queued_job = DBProcessorJob( |
|
451
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
|
452
|
|
|
internal_callback_url=self.internal_job_callback_url, |
|
453
|
|
|
state=JobState.queued |
|
454
|
|
|
) |
|
455
|
|
|
await db_queued_job.insert() |
|
456
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
|
457
|
|
|
job_output = await self.push_job_to_worker(data=data, db_job=db_queued_job) |
|
458
|
|
|
return job_output |
|
459
|
|
|
|
|
460
|
|
|
async def push_job_to_worker(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: |
|
461
|
|
|
job_output = None |
|
462
|
|
|
self.log.debug(f"Pushing to Processing Worker: {data.processor_name}, {data.page_id}, {data.job_id}") |
|
463
|
|
|
job_output = await self.push_job_to_processing_queue(db_job=db_job) |
|
464
|
|
|
if not job_output: |
|
465
|
|
|
message = f"Failed to create job output for job input: {data}" |
|
466
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
|
467
|
|
|
return job_output |
|
468
|
|
|
|
|
469
|
|
|
async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput: |
|
470
|
|
|
if not self.rmq_publisher: |
|
471
|
|
|
message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected." |
|
472
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
|
473
|
|
|
processing_message = create_processing_message(self.log, db_job) |
|
474
|
|
|
try: |
|
475
|
|
|
encoded_message = OcrdProcessingMessage.encode_yml(processing_message) |
|
476
|
|
|
self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message) |
|
477
|
|
|
except Exception as error: |
|
478
|
|
|
message = ( |
|
479
|
|
|
f"Processing server has failed to push processing message to queue: {db_job.processor_name}, " |
|
480
|
|
|
f"Processing message: {processing_message.__dict__}" |
|
481
|
|
|
) |
|
482
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
|
483
|
|
|
return db_job.to_job_output() |
|
484
|
|
|
|
|
485
|
|
|
async def get_processor_job(self, job_id: str) -> PYJobOutput: |
|
486
|
|
|
return await _get_processor_job(self.log, job_id) |
|
487
|
|
|
|
|
488
|
|
|
async def get_processor_job_log(self, job_id: str) -> FileResponse: |
|
489
|
|
|
return await _get_processor_job_log(self.log, job_id) |
|
490
|
|
|
|
|
491
|
|
|
async def _lock_pages_of_workspace( |
|
492
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
|
493
|
|
|
) -> None: |
|
494
|
|
|
# Lock the output file group pages for the current request |
|
495
|
|
|
self.cache_locked_pages.lock_pages( |
|
496
|
|
|
workspace_key=workspace_key, |
|
497
|
|
|
output_file_grps=output_file_grps, |
|
498
|
|
|
page_ids=page_ids |
|
499
|
|
|
) |
|
500
|
|
|
|
|
501
|
|
|
async def _unlock_pages_of_workspace( |
|
502
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
|
503
|
|
|
) -> None: |
|
504
|
|
|
self.cache_locked_pages.unlock_pages( |
|
505
|
|
|
workspace_key=workspace_key, |
|
506
|
|
|
output_file_grps=output_file_grps, |
|
507
|
|
|
page_ids=page_ids |
|
508
|
|
|
) |
|
509
|
|
|
|
|
510
|
|
|
async def push_cached_jobs_to_workers(self, processing_jobs: List[PYJobInput]) -> None: |
|
511
|
|
|
if not len(processing_jobs): |
|
512
|
|
|
self.log.debug("No processing jobs were consumed from the requests cache") |
|
513
|
|
|
return |
|
514
|
|
|
for data in processing_jobs: |
|
515
|
|
|
self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}") |
|
516
|
|
|
db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued) |
|
517
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
|
518
|
|
|
|
|
519
|
|
|
# Lock the output file group pages for the current request |
|
520
|
|
|
await self._lock_pages_of_workspace( |
|
521
|
|
|
workspace_key=workspace_key, |
|
522
|
|
|
output_file_grps=data.output_file_grps, |
|
523
|
|
|
page_ids=expand_page_ids(data.page_id) |
|
524
|
|
|
) |
|
525
|
|
|
|
|
526
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
|
527
|
|
|
job_output = await self.push_job_to_worker(data=data, db_job=db_consumed_job) |
|
528
|
|
|
if not job_output: |
|
529
|
|
|
self.log.exception(f"Failed to create job output for job input data: {data}") |
|
530
|
|
|
|
|
531
|
|
|
async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None: |
|
532
|
|
|
await self.cache_processing_requests.cancel_dependent_jobs( |
|
533
|
|
|
workspace_key=workspace_key, |
|
534
|
|
|
processing_job_id=job_id |
|
535
|
|
|
) |
|
536
|
|
|
|
|
537
|
|
|
async def _consume_cached_jobs_of_workspace( |
|
538
|
|
|
self, workspace_key: str, mets_server_url: str, path_to_mets: str |
|
539
|
|
|
) -> List[PYJobInput]: |
|
540
|
|
|
# decrease the internal cache counter by 1 |
|
541
|
|
|
request_counter = self.cache_processing_requests.update_request_counter( |
|
542
|
|
|
workspace_key=workspace_key, by_value=-1 |
|
543
|
|
|
) |
|
544
|
|
|
self.log.debug(f"Internal processing job cache counter value: {request_counter}") |
|
545
|
|
|
if (workspace_key not in self.cache_processing_requests.processing_requests or |
|
546
|
|
|
not len(self.cache_processing_requests.processing_requests[workspace_key])): |
|
547
|
|
|
if request_counter <= 0: |
|
548
|
|
|
# Shut down the Mets Server for the workspace_key since no |
|
549
|
|
|
# more internal callbacks are expected for that workspace |
|
550
|
|
|
self.log.debug(f"Stopping the mets server: {mets_server_url}") |
|
551
|
|
|
self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) |
|
552
|
|
|
|
|
553
|
|
|
try: |
|
554
|
|
|
# The queue is empty - delete it |
|
555
|
|
|
del self.cache_processing_requests.processing_requests[workspace_key] |
|
556
|
|
|
except KeyError: |
|
557
|
|
|
self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") |
|
558
|
|
|
|
|
559
|
|
|
# For debugging purposes it is good to see if any locked pages are left |
|
560
|
|
|
self.log.debug(f"Contents of the locked pages cache for: {workspace_key}") |
|
561
|
|
|
locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key) |
|
562
|
|
|
for output_file_grp in locked_pages: |
|
563
|
|
|
self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}") |
|
564
|
|
|
else: |
|
565
|
|
|
self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") |
|
566
|
|
|
return [] |
|
567
|
|
|
# Check whether the internal queue for the workspace key still exists |
|
568
|
|
|
if workspace_key not in self.cache_processing_requests.processing_requests: |
|
569
|
|
|
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") |
|
570
|
|
|
return [] |
|
571
|
|
|
consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) |
|
572
|
|
|
return consumed_requests |
|
573
|
|
|
|
|
574
|
|
|
async def remove_job_from_request_cache(self, result_message: PYResultMessage): |
|
575
|
|
|
result_job_id = result_message.job_id |
|
576
|
|
|
result_job_state = result_message.state |
|
577
|
|
|
path_to_mets = result_message.path_to_mets |
|
578
|
|
|
workspace_id = result_message.workspace_id |
|
579
|
|
|
self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}") |
|
580
|
|
|
|
|
581
|
|
|
db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets) |
|
582
|
|
|
mets_server_url = db_workspace.mets_server_url |
|
583
|
|
|
workspace_key = path_to_mets if path_to_mets else workspace_id |
|
584
|
|
|
|
|
585
|
|
|
if result_job_state == JobState.failed: |
|
586
|
|
|
await self._cancel_cached_dependent_jobs(workspace_key, result_job_id) |
|
587
|
|
|
|
|
588
|
|
|
if result_job_state != JobState.success: |
|
589
|
|
|
# TODO: Handle other potential error cases |
|
590
|
|
|
pass |
|
591
|
|
|
|
|
592
|
|
|
try: |
|
593
|
|
|
db_result_job = await db_get_processing_job(result_job_id) |
|
594
|
|
|
# Unlock the output file group pages for the result processing request |
|
595
|
|
|
await self._unlock_pages_of_workspace( |
|
596
|
|
|
workspace_key=workspace_key, |
|
597
|
|
|
output_file_grps=db_result_job.output_file_grps, |
|
598
|
|
|
page_ids=expand_page_ids(db_result_job.page_id) |
|
599
|
|
|
) |
|
600
|
|
|
except ValueError as error: |
|
601
|
|
|
message = f"Processing result job with id '{result_job_id}' not found in the DB." |
|
602
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
|
603
|
|
|
|
|
604
|
|
|
consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( |
|
605
|
|
|
workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets |
|
606
|
|
|
) |
|
607
|
|
|
await self.push_cached_jobs_to_workers(processing_jobs=consumed_cached_jobs) |
|
608
|
|
|
|
|
609
|
|
|
async def list_processors(self) -> List[str]: |
|
610
|
|
|
return get_message_queues(self.log, self.rmq_data) |
|
611
|
|
|
|
|
612
|
|
|
async def task_sequence_to_processing_jobs( |
|
613
|
|
|
self, |
|
614
|
|
|
tasks: List[ProcessorTask], |
|
615
|
|
|
mets_path: str, |
|
616
|
|
|
page_id: str, |
|
617
|
|
|
) -> List[PYJobOutput]: |
|
618
|
|
|
temp_file_group_cache = {} |
|
619
|
|
|
responses = [] |
|
620
|
|
|
for task in tasks: |
|
621
|
|
|
# Find dependent jobs of the current task |
|
622
|
|
|
dependent_jobs = [] |
|
623
|
|
|
for input_file_grp in task.input_file_grps: |
|
624
|
|
|
if input_file_grp in temp_file_group_cache: |
|
625
|
|
|
dependent_jobs.append(temp_file_group_cache[input_file_grp]) |
|
626
|
|
|
# NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level |
|
627
|
|
|
# Thus, setting these two flags in the ocrd process workflow file has no effect |
|
628
|
|
|
job_input_data = PYJobInput( |
|
629
|
|
|
processor_name=task.executable, |
|
630
|
|
|
path_to_mets=mets_path, |
|
631
|
|
|
input_file_grps=task.input_file_grps, |
|
632
|
|
|
output_file_grps=task.output_file_grps, |
|
633
|
|
|
page_id=page_id, |
|
634
|
|
|
parameters=task.parameters, |
|
635
|
|
|
depends_on=dependent_jobs, |
|
636
|
|
|
) |
|
637
|
|
|
response = await self.validate_and_forward_job_to_worker( |
|
638
|
|
|
processor_name=job_input_data.processor_name, |
|
639
|
|
|
data=job_input_data |
|
640
|
|
|
) |
|
641
|
|
|
for file_group in task.output_file_grps: |
|
642
|
|
|
temp_file_group_cache[file_group] = response.job_id |
|
643
|
|
|
responses.append(response) |
|
644
|
|
|
return responses |
|
645
|
|
|
|
|
646
|
|
|
def validate_tasks_worker_existence(self, tasks: List[ProcessorTask]) -> None: |
|
647
|
|
|
missing_workers = [] |
|
648
|
|
|
for task in tasks: |
|
649
|
|
|
try: |
|
650
|
|
|
self.validate_worker_existence(processor_name=task.executable) |
|
651
|
|
|
except HTTPException: |
|
652
|
|
|
# catching the error is not relevant here |
|
653
|
|
|
missing_workers.append({task.executable}) |
|
654
|
|
|
if missing_workers: |
|
655
|
|
|
message = ( |
|
656
|
|
|
"Workflow validation has failed. The desired Processing Worker was not found. " |
|
657
|
|
|
f"Missing Processing Workers: {missing_workers}" |
|
658
|
|
|
) |
|
659
|
|
|
raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message) |
|
660
|
|
|
|
|
661
|
|
|
async def run_workflow( |
|
662
|
|
|
self, |
|
663
|
|
|
mets_path: str, |
|
664
|
|
|
workflow: Union[UploadFile, str, None] = File(None), |
|
665
|
|
|
workflow_id: str = None, |
|
666
|
|
|
page_id: str = None, |
|
667
|
|
|
page_wise: bool = False, |
|
668
|
|
|
workflow_callback_url: str = None |
|
669
|
|
|
) -> PYWorkflowJobOutput: |
|
670
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=mets_path) |
|
671
|
|
|
workflow_content = await get_workflow_content(self.log, workflow_id, workflow) |
|
672
|
|
|
processing_tasks = parse_workflow_tasks(self.log, workflow_content) |
|
673
|
|
|
|
|
674
|
|
|
# Validate the input file groups of the first task in the workflow |
|
675
|
|
|
validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps) |
|
676
|
|
|
|
|
677
|
|
|
# Validate existence of Processing Workers |
|
678
|
|
|
# for the ocr-d processors referenced inside tasks |
|
679
|
|
|
self.validate_tasks_worker_existence(processing_tasks) |
|
680
|
|
|
|
|
681
|
|
|
# for page_wise mode, we need to expand the list of pages |
|
682
|
|
|
# for the database, it's better to keep a short string |
|
683
|
|
|
page_id = page_id or '' |
|
684
|
|
|
page_ids = get_page_ids_list(self.log, mets_path, page_id) |
|
685
|
|
|
|
|
686
|
|
|
if not page_wise: |
|
687
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
|
688
|
|
|
tasks=processing_tasks, |
|
689
|
|
|
mets_path=mets_path, |
|
690
|
|
|
page_id=page_id, |
|
691
|
|
|
) |
|
692
|
|
|
processing_job_ids = [response.job_id for response in responses] |
|
693
|
|
|
db_workflow_job = DBWorkflowJob( |
|
694
|
|
|
job_id=generate_id(), |
|
695
|
|
|
page_id=page_id, |
|
696
|
|
|
page_wise=page_wise, |
|
697
|
|
|
processing_job_ids={page_id: processing_job_ids}, |
|
698
|
|
|
path_to_mets=mets_path, |
|
699
|
|
|
workflow_callback_url=workflow_callback_url |
|
700
|
|
|
) |
|
701
|
|
|
await db_workflow_job.insert() |
|
702
|
|
|
return db_workflow_job.to_job_output() |
|
703
|
|
|
|
|
704
|
|
|
all_pages_job_ids = {} |
|
705
|
|
|
for current_page in page_ids: |
|
706
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
|
707
|
|
|
tasks=processing_tasks, |
|
708
|
|
|
mets_path=mets_path, |
|
709
|
|
|
page_id=current_page, |
|
710
|
|
|
) |
|
711
|
|
|
processing_job_ids = [response.job_id for response in responses] |
|
712
|
|
|
all_pages_job_ids[current_page] = processing_job_ids |
|
713
|
|
|
db_workflow_job = DBWorkflowJob( |
|
714
|
|
|
job_id=generate_id(), |
|
715
|
|
|
page_id=page_id, |
|
716
|
|
|
page_wise=page_wise, |
|
717
|
|
|
processing_job_ids=all_pages_job_ids, |
|
718
|
|
|
path_to_mets=mets_path, |
|
719
|
|
|
workflow_callback_url=workflow_callback_url |
|
720
|
|
|
) |
|
721
|
|
|
await db_workflow_job.insert() |
|
722
|
|
|
return db_workflow_job.to_job_output() |
|
723
|
|
|
|
|
724
|
|
|
@staticmethod |
|
725
|
|
|
def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict: |
|
726
|
|
|
response = {} |
|
727
|
|
|
failed_tasks = {} |
|
728
|
|
|
failed_tasks_key = "failed-processor-tasks" |
|
729
|
|
|
for p_job in processing_jobs: |
|
730
|
|
|
response.setdefault(p_job.processor_name, {}) |
|
731
|
|
|
response[p_job.processor_name].setdefault(p_job.state.value, 0) |
|
732
|
|
|
response[p_job.processor_name][p_job.state.value] += 1 |
|
733
|
|
|
if p_job.state == JobState.failed: |
|
734
|
|
|
if failed_tasks_key not in response: |
|
735
|
|
|
response[failed_tasks_key] = failed_tasks |
|
736
|
|
|
failed_tasks.setdefault(p_job.processor_name, []) |
|
737
|
|
|
failed_tasks[p_job.processor_name].append( |
|
738
|
|
|
{"job_id": p_job.job_id, "page_id": p_job.page_id} |
|
739
|
|
|
) |
|
740
|
|
|
return response |
|
741
|
|
|
|
|
742
|
|
|
@staticmethod |
|
743
|
|
|
def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState: |
|
744
|
|
|
workflow_job_state = JobState.unset |
|
745
|
|
|
success_jobs = 0 |
|
746
|
|
|
for p_job in processing_jobs: |
|
747
|
|
|
if p_job.state == JobState.cached or p_job.state == JobState.queued: |
|
748
|
|
|
continue |
|
749
|
|
|
if p_job.state == JobState.failed or p_job.state == JobState.cancelled: |
|
750
|
|
|
workflow_job_state = JobState.failed |
|
751
|
|
|
break |
|
752
|
|
|
if p_job.state == JobState.running: |
|
753
|
|
|
workflow_job_state = JobState.running |
|
754
|
|
|
if p_job.state == JobState.success: |
|
755
|
|
|
success_jobs += 1 |
|
756
|
|
|
if len(processing_jobs) == success_jobs: |
|
757
|
|
|
workflow_job_state = JobState.success |
|
758
|
|
|
return workflow_job_state |
|
759
|
|
|
|
|
760
|
|
|
async def get_workflow_info(self, workflow_job_id) -> Dict: |
|
761
|
|
|
""" Return list of a workflow's processor jobs |
|
762
|
|
|
""" |
|
763
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
|
764
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
|
765
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
|
766
|
|
|
response = self._produce_workflow_status_response(processing_jobs=jobs) |
|
767
|
|
|
return response |
|
768
|
|
|
|
|
769
|
|
|
async def kill_mets_server_zombies(self, minutes_ago: Optional[int] = None, dry_run: Optional[bool] = None) -> List[int]: |
|
770
|
|
|
pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) |
|
771
|
|
|
return pids_killed |
|
772
|
|
|
|
|
773
|
|
|
async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: |
|
774
|
|
|
""" |
|
775
|
|
|
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. |
|
776
|
|
|
- If a single processing job fails, the entire workflow job status is set to FAILED. |
|
777
|
|
|
- If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED, |
|
778
|
|
|
the entire workflow job status is set to RUNNING. |
|
779
|
|
|
- If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS |
|
780
|
|
|
""" |
|
781
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
|
782
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
|
783
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
|
784
|
|
|
workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs) |
|
785
|
|
|
return {"state": workflow_job_state} |
|
786
|
|
|
|
|
787
|
|
|
async def upload_workflow(self, workflow: Union[UploadFile, str]) -> Dict[str, str]: |
|
788
|
|
|
""" Store a script for a workflow in the database |
|
789
|
|
|
""" |
|
790
|
|
|
if isinstance(workflow, str): |
|
791
|
|
|
with open(workflow) as wf_file: |
|
792
|
|
|
workflow_content = wf_file.read() |
|
793
|
|
|
else: |
|
794
|
|
|
workflow_content = await generate_workflow_content(workflow) |
|
795
|
|
|
validate_workflow(self.log, workflow_content) |
|
796
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
|
797
|
|
|
try: |
|
798
|
|
|
db_workflow_script = await db_find_first_workflow_script_by_content(content_hash) |
|
799
|
|
|
if db_workflow_script: |
|
800
|
|
|
message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}" |
|
801
|
|
|
raise_http_exception(self.log, status.HTTP_409_CONFLICT, message) |
|
802
|
|
|
except ValueError: |
|
803
|
|
|
pass |
|
804
|
|
|
workflow_id = generate_id() |
|
805
|
|
|
db_workflow_script = DBWorkflowScript( |
|
806
|
|
|
workflow_id=workflow_id, |
|
807
|
|
|
content=workflow_content, |
|
808
|
|
|
content_hash=content_hash |
|
809
|
|
|
) |
|
810
|
|
|
await db_workflow_script.insert() |
|
811
|
|
|
return {"workflow_id": workflow_id} |
|
812
|
|
|
|
|
813
|
|
|
async def replace_workflow(self, workflow_id, workflow: Union[UploadFile, str]) -> Dict[str, str]: |
|
814
|
|
|
""" Update a workflow script file in the database |
|
815
|
|
|
""" |
|
816
|
|
|
try: |
|
817
|
|
|
db_workflow_script = await db_get_workflow_script(workflow_id) |
|
818
|
|
|
if isinstance(workflow, str): |
|
819
|
|
|
with open(workflow) as wf_file: |
|
820
|
|
|
workflow_content = wf_file.read() |
|
821
|
|
|
else: |
|
822
|
|
|
workflow_content = await generate_workflow_content(workflow) |
|
823
|
|
|
validate_workflow(self.log, workflow_content) |
|
824
|
|
|
db_workflow_script.content = workflow_content |
|
825
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
|
826
|
|
|
db_workflow_script.content_hash = content_hash |
|
827
|
|
|
await db_workflow_script.save() |
|
828
|
|
|
return {"workflow_id": db_workflow_script.workflow_id} |
|
829
|
|
|
except ValueError as error: |
|
830
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
|
831
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
|
832
|
|
|
|
|
833
|
|
|
async def download_workflow(self, workflow_id) -> PlainTextResponse: |
|
834
|
|
|
""" Load workflow-script from the database |
|
835
|
|
|
""" |
|
836
|
|
|
try: |
|
837
|
|
|
workflow = await db_get_workflow_script(workflow_id) |
|
838
|
|
|
return PlainTextResponse(workflow.content) |
|
839
|
|
|
except ValueError as error: |
|
840
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
|
841
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
|
842
|
|
|
|