1
|
|
|
from datetime import datetime |
2
|
|
|
from os import getpid |
3
|
|
|
from pathlib import Path |
4
|
|
|
from typing import Dict, List, Optional, Union |
5
|
|
|
from uvicorn import run as uvicorn_run |
6
|
|
|
|
7
|
|
|
from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile |
8
|
|
|
from fastapi.exceptions import RequestValidationError |
9
|
|
|
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse |
10
|
|
|
|
11
|
|
|
from ocrd.task_sequence import ProcessorTask |
12
|
|
|
from ocrd_utils import initLogging, getLogger |
13
|
|
|
from .constants import JobState, ServerApiTags |
14
|
|
|
from .database import ( |
15
|
|
|
initiate_database, |
16
|
|
|
db_get_processing_job, |
17
|
|
|
db_get_processing_jobs, |
18
|
|
|
db_update_processing_job, |
19
|
|
|
db_update_workspace, |
20
|
|
|
db_get_workflow_script, |
21
|
|
|
db_find_first_workflow_script_by_content |
22
|
|
|
) |
23
|
|
|
from .runtime_data import Deployer |
24
|
|
|
from .logging_utils import configure_file_handler_with_formatter, get_processing_server_logging_file_path |
25
|
|
|
from .models import ( |
26
|
|
|
DBProcessorJob, |
27
|
|
|
DBWorkflowJob, |
28
|
|
|
DBWorkflowScript, |
29
|
|
|
PYJobInput, |
30
|
|
|
PYJobOutput, |
31
|
|
|
PYResultMessage, |
32
|
|
|
PYWorkflowJobOutput |
33
|
|
|
) |
34
|
|
|
from .rabbitmq_utils import ( |
35
|
|
|
check_if_queue_exists, |
36
|
|
|
connect_rabbitmq_publisher, |
37
|
|
|
get_message_queues, |
38
|
|
|
OcrdProcessingMessage |
39
|
|
|
) |
40
|
|
|
from .server_cache import CacheLockedPages, CacheProcessingRequests |
41
|
|
|
from .server_utils import ( |
42
|
|
|
create_processing_message, |
43
|
|
|
create_workspace_if_not_exists, |
44
|
|
|
_get_processor_job, |
45
|
|
|
_get_processor_job_log, |
46
|
|
|
get_page_ids_list, |
47
|
|
|
get_workflow_content, |
48
|
|
|
get_from_database_workspace, |
49
|
|
|
get_from_database_workflow_job, |
50
|
|
|
kill_mets_server_zombies, |
51
|
|
|
parse_workflow_tasks, |
52
|
|
|
raise_http_exception, |
53
|
|
|
validate_and_return_mets_path, |
54
|
|
|
validate_first_task_input_file_groups_existence, |
55
|
|
|
validate_job_input, |
56
|
|
|
validate_workflow |
57
|
|
|
) |
58
|
|
|
from .tcp_to_uds_mets_proxy import MetsServerProxy |
59
|
|
|
from .utils import ( |
60
|
|
|
load_ocrd_all_tool_json, |
61
|
|
|
expand_page_ids, |
62
|
|
|
generate_id, |
63
|
|
|
generate_workflow_content, |
64
|
|
|
generate_workflow_content_hash |
65
|
|
|
) |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
class ProcessingServer(FastAPI): |
69
|
|
|
"""FastAPI app to make ocr-d processor calls |
70
|
|
|
|
71
|
|
|
The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing |
72
|
|
|
part. It can run ocrd-processors and provides endpoints to discover processors and watch the job |
73
|
|
|
status. |
74
|
|
|
The Processing-Server does not execute the processors itself but starts up a queue and a |
75
|
|
|
database to delegate the calls to processing workers. They are started by the Processing-Server |
76
|
|
|
and the communication goes through the queue. |
77
|
|
|
""" |
78
|
|
|
|
79
|
|
|
def __init__(self, config_path: str, host: str, port: int) -> None: |
80
|
|
|
self.title = "OCR-D Processing Server" |
81
|
|
|
super().__init__( |
82
|
|
|
title=self.title, |
83
|
|
|
on_startup=[self.on_startup], |
84
|
|
|
on_shutdown=[self.on_shutdown], |
85
|
|
|
description="OCR-D Processing Server" |
86
|
|
|
) |
87
|
|
|
initLogging() |
88
|
|
|
self.log = getLogger("ocrd_network.processing_server") |
89
|
|
|
log_file = get_processing_server_logging_file_path(pid=getpid()) |
90
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
91
|
|
|
|
92
|
|
|
self.log.info("Loading ocrd-all-tool.json") |
93
|
|
|
self.ocrd_all_tool_json = load_ocrd_all_tool_json() |
94
|
|
|
self.hostname = host |
95
|
|
|
self.port = port |
96
|
|
|
|
97
|
|
|
# The deployer is used for: |
98
|
|
|
# - deploying agents when the Processing Server is started |
99
|
|
|
# - retrieving runtime data of agents |
100
|
|
|
self.deployer = Deployer(config_path) |
101
|
|
|
# Used for forwarding Mets Server TCP requests to UDS requests |
102
|
|
|
self.mets_server_proxy = MetsServerProxy() |
103
|
|
|
self.use_tcp_mets = self.deployer.use_tcp_mets |
104
|
|
|
# If set, all Mets Server UDS requests are multiplexed over TCP |
105
|
|
|
# Used by processing workers to report back the results |
106
|
|
|
if self.deployer.internal_callback_url: |
107
|
|
|
host = self.deployer.internal_callback_url |
108
|
|
|
self.internal_job_callback_url = f"{host.rstrip('/')}/result_callback" |
109
|
|
|
self.multiplexing_endpoint = f"{host.rstrip('/')}/tcp_mets" |
110
|
|
|
else: |
111
|
|
|
self.internal_job_callback_url = f"http://{host}:{port}/result_callback" |
112
|
|
|
self.multiplexing_endpoint = f"http://{host}:{port}/tcp_mets" |
113
|
|
|
|
114
|
|
|
self.mongodb_url = None |
115
|
|
|
self.rabbitmq_url = None |
116
|
|
|
self.rmq_data = { |
117
|
|
|
"host": self.deployer.data_queue.host, |
118
|
|
|
"port": self.deployer.data_queue.port, |
119
|
|
|
"vhost": "/", |
120
|
|
|
"username": self.deployer.data_queue.cred_username, |
121
|
|
|
"password": self.deployer.data_queue.cred_password |
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
# Gets assigned when `connect_rabbitmq_publisher()` is called on the working object |
125
|
|
|
self.rmq_publisher = None |
126
|
|
|
|
127
|
|
|
# Used for keeping track of cached processing requests |
128
|
|
|
self.cache_processing_requests = CacheProcessingRequests() |
129
|
|
|
|
130
|
|
|
# Used for keeping track of locked/unlocked pages of a workspace |
131
|
|
|
self.cache_locked_pages = CacheLockedPages() |
132
|
|
|
|
133
|
|
|
self.add_api_routes_others() |
134
|
|
|
self.add_api_routes_processing() |
135
|
|
|
self.add_api_routes_workflow() |
136
|
|
|
|
137
|
|
|
@self.exception_handler(RequestValidationError) |
138
|
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError): |
139
|
|
|
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') |
140
|
|
|
self.log.error(f'{request}: {exc_str}') |
141
|
|
|
content = {'status_code': 10422, 'message': exc_str, 'data': None} |
142
|
|
|
return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) |
143
|
|
|
|
144
|
|
|
def start(self) -> None: |
145
|
|
|
""" deploy agents (db, queue, workers) and start the processing server with uvicorn |
146
|
|
|
""" |
147
|
|
|
try: |
148
|
|
|
self.rabbitmq_url = self.deployer.deploy_rabbitmq() |
149
|
|
|
self.mongodb_url = self.deployer.deploy_mongodb() |
150
|
|
|
|
151
|
|
|
# The RMQPublisher is initialized and a connection to the RabbitMQ is performed |
152
|
|
|
self.rmq_publisher = connect_rabbitmq_publisher(self.log, self.rmq_data, enable_acks=True) |
153
|
|
|
|
154
|
|
|
self.deployer.deploy_workers(mongodb_url=self.mongodb_url, rabbitmq_url=self.rabbitmq_url) |
155
|
|
|
except Exception as error: |
156
|
|
|
self.log.exception(f"Failed to start the Processing Server, error: {error}") |
157
|
|
|
self.log.warning("Trying to stop previously deployed services and workers.") |
158
|
|
|
self.deployer.stop_all() |
159
|
|
|
raise |
160
|
|
|
uvicorn_run(self, host=self.hostname, port=int(self.port)) |
161
|
|
|
|
162
|
|
|
async def on_startup(self): |
163
|
|
|
self.log.info(f"Initializing the Database on: {self.mongodb_url}") |
164
|
|
|
await initiate_database(db_url=self.mongodb_url) |
165
|
|
|
|
166
|
|
|
async def on_shutdown(self) -> None: |
167
|
|
|
""" |
168
|
|
|
- hosts and pids should be stored somewhere |
169
|
|
|
- ensure queue is empty or processor is not currently running |
170
|
|
|
- connect to hosts and kill pids |
171
|
|
|
""" |
172
|
|
|
await self.stop_deployed_agents() |
173
|
|
|
|
174
|
|
|
def add_api_routes_others(self): |
175
|
|
|
others_router = APIRouter() |
176
|
|
|
others_router.add_api_route( |
177
|
|
|
path="/", |
178
|
|
|
endpoint=self.home_page, |
179
|
|
|
methods=["GET"], |
180
|
|
|
status_code=status.HTTP_200_OK, |
181
|
|
|
summary="Get information about the processing server" |
182
|
|
|
) |
183
|
|
|
others_router.add_api_route( |
184
|
|
|
path="/stop", |
185
|
|
|
endpoint=self.stop_deployed_agents, |
186
|
|
|
methods=["POST"], |
187
|
|
|
tags=[ServerApiTags.TOOLS], |
188
|
|
|
summary="Stop database, queue and processing-workers" |
189
|
|
|
) |
190
|
|
|
others_router.add_api_route( |
191
|
|
|
path="/tcp_mets", |
192
|
|
|
methods=["POST"], |
193
|
|
|
endpoint=self.forward_tcp_request_to_uds_mets_server, |
194
|
|
|
tags=[ServerApiTags.WORKSPACE], |
195
|
|
|
summary="Forward a TCP request to UDS mets server" |
196
|
|
|
) |
197
|
|
|
others_router.add_api_route( |
198
|
|
|
path="/kill_mets_server_zombies", |
199
|
|
|
endpoint=self.kill_mets_server_zombies, |
200
|
|
|
methods=["DELETE"], |
201
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
202
|
|
|
status_code=status.HTTP_200_OK, |
203
|
|
|
summary="!! Workaround Do Not Use Unless You Have A Reason " |
204
|
|
|
"!! Kill all METS servers on this machine that have been created more than 60 minutes ago." |
205
|
|
|
) |
206
|
|
|
self.include_router(others_router) |
207
|
|
|
|
208
|
|
|
def add_api_routes_processing(self): |
209
|
|
|
processing_router = APIRouter() |
210
|
|
|
processing_router.add_api_route( |
211
|
|
|
path="/processor", |
212
|
|
|
endpoint=self.list_processors, |
213
|
|
|
methods=["GET"], |
214
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
215
|
|
|
status_code=status.HTTP_200_OK, |
216
|
|
|
summary="Get a list of all available processors" |
217
|
|
|
) |
218
|
|
|
processing_router.add_api_route( |
219
|
|
|
path="/processor/info/{processor_name}", |
220
|
|
|
endpoint=self.get_worker_ocrd_tool, |
221
|
|
|
methods=["GET"], |
222
|
|
|
tags=[ServerApiTags.PROCESSING, ServerApiTags.DISCOVERY], |
223
|
|
|
status_code=status.HTTP_200_OK, |
224
|
|
|
summary="Get information about this processor" |
225
|
|
|
) |
226
|
|
|
processing_router.add_api_route( |
227
|
|
|
path="/processor/run/{processor_name}", |
228
|
|
|
endpoint=self.validate_and_forward_job_to_worker, |
229
|
|
|
methods=["POST"], |
230
|
|
|
tags=[ServerApiTags.PROCESSING], |
231
|
|
|
status_code=status.HTTP_200_OK, |
232
|
|
|
summary="Submit a job to this processor", |
233
|
|
|
response_model=PYJobOutput, |
234
|
|
|
response_model_exclude_unset=True, |
235
|
|
|
response_model_exclude_none=True |
236
|
|
|
) |
237
|
|
|
processing_router.add_api_route( |
238
|
|
|
path="/processor/job/{job_id}", |
239
|
|
|
endpoint=self.get_processor_job, |
240
|
|
|
methods=["GET"], |
241
|
|
|
tags=[ServerApiTags.PROCESSING], |
242
|
|
|
status_code=status.HTTP_200_OK, |
243
|
|
|
summary="Get information about a job based on its ID", |
244
|
|
|
response_model=PYJobOutput, |
245
|
|
|
response_model_exclude_unset=True, |
246
|
|
|
response_model_exclude_none=True |
247
|
|
|
) |
248
|
|
|
processing_router.add_api_route( |
249
|
|
|
path="/processor/log/{job_id}", |
250
|
|
|
endpoint=self.get_processor_job_log, |
251
|
|
|
methods=["GET"], |
252
|
|
|
tags=[ServerApiTags.PROCESSING], |
253
|
|
|
status_code=status.HTTP_200_OK, |
254
|
|
|
summary="Get the log file of a job id" |
255
|
|
|
) |
256
|
|
|
processing_router.add_api_route( |
257
|
|
|
path="/result_callback", |
258
|
|
|
endpoint=self.remove_job_from_request_cache, |
259
|
|
|
methods=["POST"], |
260
|
|
|
tags=[ServerApiTags.PROCESSING], |
261
|
|
|
status_code=status.HTTP_200_OK, |
262
|
|
|
summary="Callback used by a worker for reporting result of a processing request" |
263
|
|
|
) |
264
|
|
|
self.include_router(processing_router) |
265
|
|
|
|
266
|
|
|
def add_api_routes_workflow(self): |
267
|
|
|
workflow_router = APIRouter() |
268
|
|
|
workflow_router.add_api_route( |
269
|
|
|
path="/workflow", |
270
|
|
|
endpoint=self.upload_workflow, |
271
|
|
|
methods=["POST"], |
272
|
|
|
tags=[ServerApiTags.WORKFLOW], |
273
|
|
|
status_code=status.HTTP_201_CREATED, |
274
|
|
|
summary="Upload/Register a new workflow script" |
275
|
|
|
) |
276
|
|
|
workflow_router.add_api_route( |
277
|
|
|
path="/workflow/{workflow_id}", |
278
|
|
|
endpoint=self.download_workflow, |
279
|
|
|
methods=["GET"], |
280
|
|
|
tags=[ServerApiTags.WORKFLOW], |
281
|
|
|
status_code=status.HTTP_200_OK, |
282
|
|
|
summary="Download a workflow script" |
283
|
|
|
) |
284
|
|
|
workflow_router.add_api_route( |
285
|
|
|
path="/workflow/{workflow_id}", |
286
|
|
|
endpoint=self.replace_workflow, |
287
|
|
|
methods=["PUT"], |
288
|
|
|
tags=[ServerApiTags.WORKFLOW], |
289
|
|
|
status_code=status.HTTP_200_OK, |
290
|
|
|
summary="Update/Replace a workflow script" |
291
|
|
|
) |
292
|
|
|
workflow_router.add_api_route( |
293
|
|
|
path="/workflow/run", |
294
|
|
|
endpoint=self.run_workflow, |
295
|
|
|
methods=["POST"], |
296
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
297
|
|
|
status_code=status.HTTP_200_OK, |
298
|
|
|
summary="Run a workflow", |
299
|
|
|
response_model=PYWorkflowJobOutput, |
300
|
|
|
response_model_exclude_defaults=True, |
301
|
|
|
response_model_exclude_unset=True, |
302
|
|
|
response_model_exclude_none=True |
303
|
|
|
) |
304
|
|
|
workflow_router.add_api_route( |
305
|
|
|
path="/workflow/job-simple/{workflow_job_id}", |
306
|
|
|
endpoint=self.get_workflow_info_simple, |
307
|
|
|
methods=["GET"], |
308
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
309
|
|
|
status_code=status.HTTP_200_OK, |
310
|
|
|
summary="Get simplified overall job status" |
311
|
|
|
) |
312
|
|
|
workflow_router.add_api_route( |
313
|
|
|
path="/workflow/job/{workflow_job_id}", |
314
|
|
|
endpoint=self.get_workflow_info, |
315
|
|
|
methods=["GET"], |
316
|
|
|
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], |
317
|
|
|
status_code=status.HTTP_200_OK, |
318
|
|
|
summary="Get information about a workflow run" |
319
|
|
|
) |
320
|
|
|
self.include_router(workflow_router) |
321
|
|
|
|
322
|
|
|
async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict: |
323
|
|
|
"""Forward mets-server-request |
324
|
|
|
|
325
|
|
|
A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends |
326
|
|
|
a request to this endpoint. This request contains all information necessary to make a call |
327
|
|
|
to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call |
328
|
|
|
to the local (local for the processing-server) reachable the uds-mets-server. |
329
|
|
|
""" |
330
|
|
|
request_body = await request.json() |
331
|
|
|
ws_dir_path = request_body["workspace_path"] |
332
|
|
|
self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
333
|
|
|
return self.mets_server_proxy.forward_tcp_request(request_body=request_body) |
334
|
|
|
|
335
|
|
|
async def home_page(self): |
336
|
|
|
message = f"The home page of the {self.title}" |
337
|
|
|
json_message = { |
338
|
|
|
"message": message, |
339
|
|
|
"time": datetime.now().strftime("%Y-%m-%d %H:%M") |
340
|
|
|
} |
341
|
|
|
return json_message |
342
|
|
|
|
343
|
|
|
async def stop_deployed_agents(self) -> None: |
344
|
|
|
self.deployer.stop_all() |
345
|
|
|
|
346
|
|
|
async def get_worker_ocrd_tool(self, processor_name: str) -> Dict: |
347
|
|
|
ocrd_tool = {} |
348
|
|
|
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) |
349
|
|
|
if not ocrd_tool: |
350
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, |
351
|
|
|
f"Processing Worker '{processor_name}' not found.") |
352
|
|
|
return ocrd_tool |
353
|
|
|
|
354
|
|
|
def exists_worker(self, processor_name: str) -> bool: |
355
|
|
|
# TODO: Reconsider and refactor this. |
356
|
|
|
# Added ocrd-dummy by default if not available for the integration tests. |
357
|
|
|
# A proper Processing Worker registration endpoint is needed on the Processing Server side |
358
|
|
|
if processor_name == 'ocrd-dummy': |
359
|
|
|
return True |
360
|
|
|
return bool(check_if_queue_exists(self.log, self.rmq_data, processor_name=processor_name)) |
361
|
|
|
|
362
|
|
|
def validate_worker_existence(self, processor_name: str) -> None: |
363
|
|
|
worker_exists = self.exists_worker(processor_name=processor_name) |
364
|
|
|
if not worker_exists: |
365
|
|
|
message = f"Processing Worker '{processor_name}' not found." |
366
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
367
|
|
|
|
368
|
|
|
async def validate_and_forward_job_to_worker(self, processor_name: str, data: PYJobInput) -> PYJobOutput: |
369
|
|
|
# Append the processor name to the request itself |
370
|
|
|
data.processor_name = processor_name |
371
|
|
|
self.validate_worker_existence(processor_name=data.processor_name) |
372
|
|
|
if data.job_id: |
373
|
|
|
message = f"Processing request job id field is set but must not be: {data.job_id}" |
374
|
|
|
raise_http_exception(self.log, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
375
|
|
|
# Generate processing job id |
376
|
|
|
data.job_id = generate_id() |
377
|
|
|
ocrd_tool = await self.get_worker_ocrd_tool(processor_name=data.processor_name) |
378
|
|
|
validate_job_input(self.log, data.processor_name, ocrd_tool, data) |
379
|
|
|
|
380
|
|
|
if data.workspace_id: |
381
|
|
|
# just a check whether the workspace exists in the database or not |
382
|
|
|
await get_from_database_workspace(self.log, data.workspace_id) |
383
|
|
|
else: # data.path_to_mets provided instead |
384
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=data.path_to_mets) |
385
|
|
|
|
386
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
387
|
|
|
# initialize the request counter for the workspace_key |
388
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=0) |
389
|
|
|
|
390
|
|
|
# This check is done to return early in case a workspace_id is provided |
391
|
|
|
# but the abs mets path cannot be queried from the DB |
392
|
|
|
request_mets_path = await validate_and_return_mets_path(self.log, data) |
393
|
|
|
|
394
|
|
|
page_ids = expand_page_ids(data.page_id) |
395
|
|
|
|
396
|
|
|
# A flag whether the current request must be cached |
397
|
|
|
# This is set to true if for any output file group there |
398
|
|
|
# is a page_id value that has been previously locked |
399
|
|
|
cache_current_request = False |
400
|
|
|
|
401
|
|
|
# Check if there are any dependencies of the current request |
402
|
|
|
if data.depends_on: |
403
|
|
|
cache_current_request = await self.cache_processing_requests.is_caching_required(data.depends_on) |
404
|
|
|
|
405
|
|
|
# No need for further check of locked pages dependency |
406
|
|
|
# if the request should be already cached |
407
|
|
|
if not cache_current_request: |
408
|
|
|
# Check if there are any locked pages for the current request |
409
|
|
|
cache_current_request = self.cache_locked_pages.check_if_locked_pages_for_output_file_grps( |
410
|
|
|
workspace_key=workspace_key, |
411
|
|
|
output_file_grps=data.output_file_grps, |
412
|
|
|
page_ids=page_ids |
413
|
|
|
) |
414
|
|
|
|
415
|
|
|
if cache_current_request: |
416
|
|
|
# Cache the received request |
417
|
|
|
self.cache_processing_requests.cache_request(workspace_key, data) |
418
|
|
|
|
419
|
|
|
# Create a cached job DB entry |
420
|
|
|
db_cached_job = DBProcessorJob( |
421
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
422
|
|
|
internal_callback_url=self.internal_job_callback_url, |
423
|
|
|
state=JobState.cached |
424
|
|
|
) |
425
|
|
|
await db_cached_job.insert() |
426
|
|
|
return db_cached_job.to_job_output() |
427
|
|
|
|
428
|
|
|
# Lock the pages in the request |
429
|
|
|
self.cache_locked_pages.lock_pages( |
430
|
|
|
workspace_key=workspace_key, |
431
|
|
|
output_file_grps=data.output_file_grps, |
432
|
|
|
page_ids=page_ids |
433
|
|
|
) |
434
|
|
|
|
435
|
|
|
# Start a UDS Mets Server with the current workspace |
436
|
|
|
ws_dir_path = str(Path(request_mets_path).parent) |
437
|
|
|
mets_server_url = self.deployer.start_uds_mets_server(ws_dir_path=ws_dir_path) |
438
|
|
|
if self.use_tcp_mets: |
439
|
|
|
# let workers talk to mets server via tcp instead of using unix-socket |
440
|
|
|
mets_server_url = self.multiplexing_endpoint |
441
|
|
|
|
442
|
|
|
# Assign the mets server url in the database (workers read mets_server_url from db) |
443
|
|
|
await db_update_workspace( |
444
|
|
|
workspace_id=data.workspace_id, |
445
|
|
|
workspace_mets_path=data.path_to_mets, |
446
|
|
|
mets_server_url=mets_server_url |
447
|
|
|
) |
448
|
|
|
|
449
|
|
|
# Create a queued job DB entry |
450
|
|
|
db_queued_job = DBProcessorJob( |
451
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
452
|
|
|
internal_callback_url=self.internal_job_callback_url, |
453
|
|
|
state=JobState.queued |
454
|
|
|
) |
455
|
|
|
await db_queued_job.insert() |
456
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
457
|
|
|
job_output = await self.push_job_to_worker(data=data, db_job=db_queued_job) |
458
|
|
|
return job_output |
459
|
|
|
|
460
|
|
|
async def push_job_to_worker(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: |
461
|
|
|
job_output = None |
462
|
|
|
self.log.debug(f"Pushing to Processing Worker: {data.processor_name}, {data.page_id}, {data.job_id}") |
463
|
|
|
job_output = await self.push_job_to_processing_queue(db_job=db_job) |
464
|
|
|
if not job_output: |
465
|
|
|
message = f"Failed to create job output for job input: {data}" |
466
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
467
|
|
|
return job_output |
468
|
|
|
|
469
|
|
|
async def push_job_to_processing_queue(self, db_job: DBProcessorJob) -> PYJobOutput: |
470
|
|
|
if not self.rmq_publisher: |
471
|
|
|
message = "The Processing Server has no connection to RabbitMQ Server. RMQPublisher is not connected." |
472
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
473
|
|
|
processing_message = create_processing_message(self.log, db_job) |
474
|
|
|
try: |
475
|
|
|
encoded_message = OcrdProcessingMessage.encode_yml(processing_message) |
476
|
|
|
self.rmq_publisher.publish_to_queue(queue_name=db_job.processor_name, message=encoded_message) |
477
|
|
|
except Exception as error: |
478
|
|
|
message = ( |
479
|
|
|
f"Processing server has failed to push processing message to queue: {db_job.processor_name}, " |
480
|
|
|
f"Processing message: {processing_message.__dict__}" |
481
|
|
|
) |
482
|
|
|
raise_http_exception(self.log, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
483
|
|
|
return db_job.to_job_output() |
484
|
|
|
|
485
|
|
|
async def get_processor_job(self, job_id: str) -> PYJobOutput: |
486
|
|
|
return await _get_processor_job(self.log, job_id) |
487
|
|
|
|
488
|
|
|
async def get_processor_job_log(self, job_id: str) -> FileResponse: |
489
|
|
|
return await _get_processor_job_log(self.log, job_id) |
490
|
|
|
|
491
|
|
|
async def _lock_pages_of_workspace( |
492
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
493
|
|
|
) -> None: |
494
|
|
|
# Lock the output file group pages for the current request |
495
|
|
|
self.cache_locked_pages.lock_pages( |
496
|
|
|
workspace_key=workspace_key, |
497
|
|
|
output_file_grps=output_file_grps, |
498
|
|
|
page_ids=page_ids |
499
|
|
|
) |
500
|
|
|
|
501
|
|
|
async def _unlock_pages_of_workspace( |
502
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
503
|
|
|
) -> None: |
504
|
|
|
self.cache_locked_pages.unlock_pages( |
505
|
|
|
workspace_key=workspace_key, |
506
|
|
|
output_file_grps=output_file_grps, |
507
|
|
|
page_ids=page_ids |
508
|
|
|
) |
509
|
|
|
|
510
|
|
|
async def push_cached_jobs_to_workers(self, processing_jobs: List[PYJobInput]) -> None: |
511
|
|
|
if not len(processing_jobs): |
512
|
|
|
self.log.debug("No processing jobs were consumed from the requests cache") |
513
|
|
|
return |
514
|
|
|
for data in processing_jobs: |
515
|
|
|
self.log.info(f"Changing the job status of: {data.job_id} from {JobState.cached} to {JobState.queued}") |
516
|
|
|
db_consumed_job = await db_update_processing_job(job_id=data.job_id, state=JobState.queued) |
517
|
|
|
workspace_key = data.path_to_mets if data.path_to_mets else data.workspace_id |
518
|
|
|
|
519
|
|
|
# Lock the output file group pages for the current request |
520
|
|
|
await self._lock_pages_of_workspace( |
521
|
|
|
workspace_key=workspace_key, |
522
|
|
|
output_file_grps=data.output_file_grps, |
523
|
|
|
page_ids=expand_page_ids(data.page_id) |
524
|
|
|
) |
525
|
|
|
|
526
|
|
|
self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) |
527
|
|
|
job_output = await self.push_job_to_worker(data=data, db_job=db_consumed_job) |
528
|
|
|
if not job_output: |
529
|
|
|
self.log.exception(f"Failed to create job output for job input data: {data}") |
530
|
|
|
|
531
|
|
|
async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) -> None: |
532
|
|
|
await self.cache_processing_requests.cancel_dependent_jobs( |
533
|
|
|
workspace_key=workspace_key, |
534
|
|
|
processing_job_id=job_id |
535
|
|
|
) |
536
|
|
|
|
537
|
|
|
async def _consume_cached_jobs_of_workspace( |
538
|
|
|
self, workspace_key: str, mets_server_url: str, path_to_mets: str |
539
|
|
|
) -> List[PYJobInput]: |
540
|
|
|
# decrease the internal cache counter by 1 |
541
|
|
|
request_counter = self.cache_processing_requests.update_request_counter( |
542
|
|
|
workspace_key=workspace_key, by_value=-1 |
543
|
|
|
) |
544
|
|
|
self.log.debug(f"Internal processing job cache counter value: {request_counter}") |
545
|
|
|
if (workspace_key not in self.cache_processing_requests.processing_requests or |
546
|
|
|
not len(self.cache_processing_requests.processing_requests[workspace_key])): |
547
|
|
|
if request_counter <= 0: |
548
|
|
|
# Shut down the Mets Server for the workspace_key since no |
549
|
|
|
# more internal callbacks are expected for that workspace |
550
|
|
|
self.log.debug(f"Stopping the mets server: {mets_server_url}") |
551
|
|
|
self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) |
552
|
|
|
|
553
|
|
|
try: |
554
|
|
|
# The queue is empty - delete it |
555
|
|
|
del self.cache_processing_requests.processing_requests[workspace_key] |
556
|
|
|
except KeyError: |
557
|
|
|
self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") |
558
|
|
|
|
559
|
|
|
# For debugging purposes it is good to see if any locked pages are left |
560
|
|
|
self.log.debug(f"Contents of the locked pages cache for: {workspace_key}") |
561
|
|
|
locked_pages = self.cache_locked_pages.get_locked_pages(workspace_key=workspace_key) |
562
|
|
|
for output_file_grp in locked_pages: |
563
|
|
|
self.log.debug(f"{output_file_grp}: {locked_pages[output_file_grp]}") |
564
|
|
|
else: |
565
|
|
|
self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") |
566
|
|
|
return [] |
567
|
|
|
# Check whether the internal queue for the workspace key still exists |
568
|
|
|
if workspace_key not in self.cache_processing_requests.processing_requests: |
569
|
|
|
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") |
570
|
|
|
return [] |
571
|
|
|
consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) |
572
|
|
|
return consumed_requests |
573
|
|
|
|
574
|
|
|
async def remove_job_from_request_cache(self, result_message: PYResultMessage): |
575
|
|
|
result_job_id = result_message.job_id |
576
|
|
|
result_job_state = result_message.state |
577
|
|
|
path_to_mets = result_message.path_to_mets |
578
|
|
|
workspace_id = result_message.workspace_id |
579
|
|
|
self.log.info(f"Result job_id: {result_job_id}, state: {result_job_state}") |
580
|
|
|
|
581
|
|
|
db_workspace = await get_from_database_workspace(self.log, workspace_id, path_to_mets) |
582
|
|
|
mets_server_url = db_workspace.mets_server_url |
583
|
|
|
workspace_key = path_to_mets if path_to_mets else workspace_id |
584
|
|
|
|
585
|
|
|
if result_job_state == JobState.failed: |
586
|
|
|
await self._cancel_cached_dependent_jobs(workspace_key, result_job_id) |
587
|
|
|
|
588
|
|
|
if result_job_state != JobState.success: |
589
|
|
|
# TODO: Handle other potential error cases |
590
|
|
|
pass |
591
|
|
|
|
592
|
|
|
try: |
593
|
|
|
db_result_job = await db_get_processing_job(result_job_id) |
594
|
|
|
# Unlock the output file group pages for the result processing request |
595
|
|
|
await self._unlock_pages_of_workspace( |
596
|
|
|
workspace_key=workspace_key, |
597
|
|
|
output_file_grps=db_result_job.output_file_grps, |
598
|
|
|
page_ids=expand_page_ids(db_result_job.page_id) |
599
|
|
|
) |
600
|
|
|
except ValueError as error: |
601
|
|
|
message = f"Processing result job with id '{result_job_id}' not found in the DB." |
602
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
603
|
|
|
|
604
|
|
|
consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( |
605
|
|
|
workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets |
606
|
|
|
) |
607
|
|
|
await self.push_cached_jobs_to_workers(processing_jobs=consumed_cached_jobs) |
608
|
|
|
|
609
|
|
|
async def list_processors(self) -> List[str]: |
610
|
|
|
return get_message_queues(self.log, self.rmq_data) |
611
|
|
|
|
612
|
|
|
async def task_sequence_to_processing_jobs( |
613
|
|
|
self, |
614
|
|
|
tasks: List[ProcessorTask], |
615
|
|
|
mets_path: str, |
616
|
|
|
page_id: str, |
617
|
|
|
) -> List[PYJobOutput]: |
618
|
|
|
temp_file_group_cache = {} |
619
|
|
|
responses = [] |
620
|
|
|
for task in tasks: |
621
|
|
|
# Find dependent jobs of the current task |
622
|
|
|
dependent_jobs = [] |
623
|
|
|
for input_file_grp in task.input_file_grps: |
624
|
|
|
if input_file_grp in temp_file_group_cache: |
625
|
|
|
dependent_jobs.append(temp_file_group_cache[input_file_grp]) |
626
|
|
|
# NOTE: The `task.mets_path` and `task.page_id` is not utilized in low level |
627
|
|
|
# Thus, setting these two flags in the ocrd process workflow file has no effect |
628
|
|
|
job_input_data = PYJobInput( |
629
|
|
|
processor_name=task.executable, |
630
|
|
|
path_to_mets=mets_path, |
631
|
|
|
input_file_grps=task.input_file_grps, |
632
|
|
|
output_file_grps=task.output_file_grps, |
633
|
|
|
page_id=page_id, |
634
|
|
|
parameters=task.parameters, |
635
|
|
|
depends_on=dependent_jobs, |
636
|
|
|
) |
637
|
|
|
response = await self.validate_and_forward_job_to_worker( |
638
|
|
|
processor_name=job_input_data.processor_name, |
639
|
|
|
data=job_input_data |
640
|
|
|
) |
641
|
|
|
for file_group in task.output_file_grps: |
642
|
|
|
temp_file_group_cache[file_group] = response.job_id |
643
|
|
|
responses.append(response) |
644
|
|
|
return responses |
645
|
|
|
|
646
|
|
|
def validate_tasks_worker_existence(self, tasks: List[ProcessorTask]) -> None: |
647
|
|
|
missing_workers = [] |
648
|
|
|
for task in tasks: |
649
|
|
|
try: |
650
|
|
|
self.validate_worker_existence(processor_name=task.executable) |
651
|
|
|
except HTTPException: |
652
|
|
|
# catching the error is not relevant here |
653
|
|
|
missing_workers.append({task.executable}) |
654
|
|
|
if missing_workers: |
655
|
|
|
message = ( |
656
|
|
|
"Workflow validation has failed. The desired Processing Worker was not found. " |
657
|
|
|
f"Missing Processing Workers: {missing_workers}" |
658
|
|
|
) |
659
|
|
|
raise_http_exception(self.log, status.HTTP_406_NOT_ACCEPTABLE, message) |
660
|
|
|
|
661
|
|
|
async def run_workflow( |
662
|
|
|
self, |
663
|
|
|
mets_path: str, |
664
|
|
|
workflow: Union[UploadFile, str, None] = File(None), |
665
|
|
|
workflow_id: str = None, |
666
|
|
|
page_id: str = None, |
667
|
|
|
page_wise: bool = False, |
668
|
|
|
workflow_callback_url: str = None |
669
|
|
|
) -> PYWorkflowJobOutput: |
670
|
|
|
await create_workspace_if_not_exists(self.log, mets_path=mets_path) |
671
|
|
|
workflow_content = await get_workflow_content(self.log, workflow_id, workflow) |
672
|
|
|
processing_tasks = parse_workflow_tasks(self.log, workflow_content) |
673
|
|
|
|
674
|
|
|
# Validate the input file groups of the first task in the workflow |
675
|
|
|
validate_first_task_input_file_groups_existence(self.log, mets_path, processing_tasks[0].input_file_grps) |
676
|
|
|
|
677
|
|
|
# Validate existence of Processing Workers |
678
|
|
|
# for the ocr-d processors referenced inside tasks |
679
|
|
|
self.validate_tasks_worker_existence(processing_tasks) |
680
|
|
|
|
681
|
|
|
# for page_wise mode, we need to expand the list of pages |
682
|
|
|
# for the database, it's better to keep a short string |
683
|
|
|
page_id = page_id or '' |
684
|
|
|
page_ids = get_page_ids_list(self.log, mets_path, page_id) |
685
|
|
|
|
686
|
|
|
if not page_wise: |
687
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
688
|
|
|
tasks=processing_tasks, |
689
|
|
|
mets_path=mets_path, |
690
|
|
|
page_id=page_id, |
691
|
|
|
) |
692
|
|
|
processing_job_ids = [response.job_id for response in responses] |
693
|
|
|
db_workflow_job = DBWorkflowJob( |
694
|
|
|
job_id=generate_id(), |
695
|
|
|
page_id=page_id, |
696
|
|
|
page_wise=page_wise, |
697
|
|
|
processing_job_ids={page_id: processing_job_ids}, |
698
|
|
|
path_to_mets=mets_path, |
699
|
|
|
workflow_callback_url=workflow_callback_url |
700
|
|
|
) |
701
|
|
|
await db_workflow_job.insert() |
702
|
|
|
return db_workflow_job.to_job_output() |
703
|
|
|
|
704
|
|
|
all_pages_job_ids = {} |
705
|
|
|
for current_page in page_ids: |
706
|
|
|
responses = await self.task_sequence_to_processing_jobs( |
707
|
|
|
tasks=processing_tasks, |
708
|
|
|
mets_path=mets_path, |
709
|
|
|
page_id=current_page, |
710
|
|
|
) |
711
|
|
|
processing_job_ids = [response.job_id for response in responses] |
712
|
|
|
all_pages_job_ids[current_page] = processing_job_ids |
713
|
|
|
db_workflow_job = DBWorkflowJob( |
714
|
|
|
job_id=generate_id(), |
715
|
|
|
page_id=page_id, |
716
|
|
|
page_wise=page_wise, |
717
|
|
|
processing_job_ids=all_pages_job_ids, |
718
|
|
|
path_to_mets=mets_path, |
719
|
|
|
workflow_callback_url=workflow_callback_url |
720
|
|
|
) |
721
|
|
|
await db_workflow_job.insert() |
722
|
|
|
return db_workflow_job.to_job_output() |
723
|
|
|
|
724
|
|
|
@staticmethod |
725
|
|
|
def _produce_workflow_status_response(processing_jobs: List[DBProcessorJob]) -> Dict: |
726
|
|
|
response = {} |
727
|
|
|
failed_tasks = {} |
728
|
|
|
failed_tasks_key = "failed-processor-tasks" |
729
|
|
|
for p_job in processing_jobs: |
730
|
|
|
response.setdefault(p_job.processor_name, {}) |
731
|
|
|
response[p_job.processor_name].setdefault(p_job.state.value, 0) |
732
|
|
|
response[p_job.processor_name][p_job.state.value] += 1 |
733
|
|
|
if p_job.state == JobState.failed: |
734
|
|
|
if failed_tasks_key not in response: |
735
|
|
|
response[failed_tasks_key] = failed_tasks |
736
|
|
|
failed_tasks.setdefault(p_job.processor_name, []) |
737
|
|
|
failed_tasks[p_job.processor_name].append( |
738
|
|
|
{"job_id": p_job.job_id, "page_id": p_job.page_id} |
739
|
|
|
) |
740
|
|
|
return response |
741
|
|
|
|
742
|
|
|
@staticmethod |
743
|
|
|
def _produce_workflow_status_simple_response(processing_jobs: List[DBProcessorJob]) -> JobState: |
744
|
|
|
workflow_job_state = JobState.unset |
745
|
|
|
success_jobs = 0 |
746
|
|
|
for p_job in processing_jobs: |
747
|
|
|
if p_job.state == JobState.cached or p_job.state == JobState.queued: |
748
|
|
|
continue |
749
|
|
|
if p_job.state == JobState.failed or p_job.state == JobState.cancelled: |
750
|
|
|
workflow_job_state = JobState.failed |
751
|
|
|
break |
752
|
|
|
if p_job.state == JobState.running: |
753
|
|
|
workflow_job_state = JobState.running |
754
|
|
|
if p_job.state == JobState.success: |
755
|
|
|
success_jobs += 1 |
756
|
|
|
if len(processing_jobs) == success_jobs: |
757
|
|
|
workflow_job_state = JobState.success |
758
|
|
|
return workflow_job_state |
759
|
|
|
|
760
|
|
|
async def get_workflow_info(self, workflow_job_id) -> Dict: |
761
|
|
|
""" Return list of a workflow's processor jobs |
762
|
|
|
""" |
763
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
764
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
765
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
766
|
|
|
response = self._produce_workflow_status_response(processing_jobs=jobs) |
767
|
|
|
return response |
768
|
|
|
|
769
|
|
|
async def kill_mets_server_zombies(self, minutes_ago: Optional[int] = None, dry_run: Optional[bool] = None) -> List[int]: |
770
|
|
|
pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) |
771
|
|
|
return pids_killed |
772
|
|
|
|
773
|
|
|
async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: |
774
|
|
|
""" |
775
|
|
|
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. |
776
|
|
|
- If a single processing job fails, the entire workflow job status is set to FAILED. |
777
|
|
|
- If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED, |
778
|
|
|
the entire workflow job status is set to RUNNING. |
779
|
|
|
- If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS |
780
|
|
|
""" |
781
|
|
|
workflow_job = await get_from_database_workflow_job(self.log, workflow_job_id) |
782
|
|
|
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] |
783
|
|
|
jobs = await db_get_processing_jobs(job_ids) |
784
|
|
|
workflow_job_state = self._produce_workflow_status_simple_response(processing_jobs=jobs) |
785
|
|
|
return {"state": workflow_job_state} |
786
|
|
|
|
787
|
|
|
async def upload_workflow(self, workflow: Union[UploadFile, str]) -> Dict[str, str]: |
788
|
|
|
""" Store a script for a workflow in the database |
789
|
|
|
""" |
790
|
|
|
if isinstance(workflow, str): |
791
|
|
|
with open(workflow) as wf_file: |
792
|
|
|
workflow_content = wf_file.read() |
793
|
|
|
else: |
794
|
|
|
workflow_content = await generate_workflow_content(workflow) |
795
|
|
|
validate_workflow(self.log, workflow_content) |
796
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
797
|
|
|
try: |
798
|
|
|
db_workflow_script = await db_find_first_workflow_script_by_content(content_hash) |
799
|
|
|
if db_workflow_script: |
800
|
|
|
message = f"The same workflow script already exists, workflow id: {db_workflow_script.workflow_id}" |
801
|
|
|
raise_http_exception(self.log, status.HTTP_409_CONFLICT, message) |
802
|
|
|
except ValueError: |
803
|
|
|
pass |
804
|
|
|
workflow_id = generate_id() |
805
|
|
|
db_workflow_script = DBWorkflowScript( |
806
|
|
|
workflow_id=workflow_id, |
807
|
|
|
content=workflow_content, |
808
|
|
|
content_hash=content_hash |
809
|
|
|
) |
810
|
|
|
await db_workflow_script.insert() |
811
|
|
|
return {"workflow_id": workflow_id} |
812
|
|
|
|
813
|
|
|
async def replace_workflow(self, workflow_id, workflow: Union[UploadFile, str]) -> Dict[str, str]: |
814
|
|
|
""" Update a workflow script file in the database |
815
|
|
|
""" |
816
|
|
|
try: |
817
|
|
|
db_workflow_script = await db_get_workflow_script(workflow_id) |
818
|
|
|
if isinstance(workflow, str): |
819
|
|
|
with open(workflow) as wf_file: |
820
|
|
|
workflow_content = wf_file.read() |
821
|
|
|
else: |
822
|
|
|
workflow_content = await generate_workflow_content(workflow) |
823
|
|
|
validate_workflow(self.log, workflow_content) |
824
|
|
|
db_workflow_script.content = workflow_content |
825
|
|
|
content_hash = generate_workflow_content_hash(workflow_content) |
826
|
|
|
db_workflow_script.content_hash = content_hash |
827
|
|
|
await db_workflow_script.save() |
828
|
|
|
return {"workflow_id": db_workflow_script.workflow_id} |
829
|
|
|
except ValueError as error: |
830
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
831
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
832
|
|
|
|
833
|
|
|
async def download_workflow(self, workflow_id) -> PlainTextResponse: |
834
|
|
|
""" Load workflow-script from the database |
835
|
|
|
""" |
836
|
|
|
try: |
837
|
|
|
workflow = await db_get_workflow_script(workflow_id) |
838
|
|
|
return PlainTextResponse(workflow.content) |
839
|
|
|
except ValueError as error: |
840
|
|
|
message = f"Workflow script not existing for id '{workflow_id}'." |
841
|
|
|
raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) |
842
|
|
|
|