|
1
|
|
|
import json |
|
2
|
|
|
import requests |
|
3
|
|
|
import httpx |
|
4
|
|
|
from typing import Dict, List |
|
5
|
|
|
import uvicorn |
|
6
|
|
|
|
|
7
|
|
|
from fastapi import FastAPI, status, Request, HTTPException |
|
8
|
|
|
from fastapi.exceptions import RequestValidationError |
|
9
|
|
|
from fastapi.responses import JSONResponse |
|
10
|
|
|
|
|
11
|
|
|
from pika.exceptions import ChannelClosedByBroker |
|
12
|
|
|
from ocrd_utils import getLogger |
|
13
|
|
|
from .database import ( |
|
14
|
|
|
initiate_database, |
|
15
|
|
|
db_get_processing_job, |
|
16
|
|
|
db_get_workspace, |
|
17
|
|
|
db_update_workspace, |
|
18
|
|
|
) |
|
19
|
|
|
from .deployer import Deployer |
|
20
|
|
|
from .models import ( |
|
21
|
|
|
DBProcessorJob, |
|
22
|
|
|
PYJobInput, |
|
23
|
|
|
PYJobOutput, |
|
24
|
|
|
PYResultMessage, |
|
25
|
|
|
StateEnum |
|
26
|
|
|
) |
|
27
|
|
|
from .rabbitmq_utils import ( |
|
28
|
|
|
RMQPublisher, |
|
29
|
|
|
OcrdProcessingMessage |
|
30
|
|
|
) |
|
31
|
|
|
from .server_utils import ( |
|
32
|
|
|
_get_processor_job, |
|
33
|
|
|
expand_page_ids, |
|
34
|
|
|
validate_and_return_mets_path, |
|
35
|
|
|
validate_job_input, |
|
36
|
|
|
) |
|
37
|
|
|
from .utils import ( |
|
38
|
|
|
download_ocrd_all_tool_json, |
|
39
|
|
|
generate_created_time, |
|
40
|
|
|
generate_id |
|
41
|
|
|
) |
|
42
|
|
|
|
|
43
|
|
|
|
|
44
|
|
|
class ProcessingServer(FastAPI): |
|
45
|
|
|
"""FastAPI app to make ocr-d processor calls |
|
46
|
|
|
|
|
47
|
|
|
The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing |
|
48
|
|
|
part. It can run ocrd-processors and provides endpoints to discover processors and watch the job |
|
49
|
|
|
status. |
|
50
|
|
|
The Processing-Server does not execute the processors itself but starts up a queue and a |
|
51
|
|
|
database to delegate the calls to processing workers. They are started by the Processing-Server |
|
52
|
|
|
and the communication goes through the queue. |
|
53
|
|
|
""" |
|
54
|
|
|
|
|
55
|
|
|
def __init__(self, config_path: str, host: str, port: int) -> None: |
|
56
|
|
|
super().__init__(on_startup=[self.on_startup], on_shutdown=[self.on_shutdown], |
|
57
|
|
|
title='OCR-D Processing Server', |
|
58
|
|
|
description='OCR-D processing and processors') |
|
59
|
|
|
self.log = getLogger(__name__) |
|
60
|
|
|
self.log.info(f"Downloading ocrd all tool json") |
|
61
|
|
|
self.ocrd_all_tool_json = download_ocrd_all_tool_json( |
|
62
|
|
|
ocrd_all_url="https://ocr-d.de/js/ocrd-all-tool.json" |
|
63
|
|
|
) |
|
64
|
|
|
self.hostname = host |
|
65
|
|
|
self.port = port |
|
66
|
|
|
# The deployer is used for: |
|
67
|
|
|
# - deploying agents when the Processing Server is started |
|
68
|
|
|
# - retrieving runtime data of agents |
|
69
|
|
|
self.deployer = Deployer(config_path) |
|
70
|
|
|
self.mongodb_url = None |
|
71
|
|
|
# TODO: Combine these under a single URL, rabbitmq_utils needs an update |
|
72
|
|
|
self.rmq_host = self.deployer.data_queue.address |
|
73
|
|
|
self.rmq_port = self.deployer.data_queue.port |
|
74
|
|
|
self.rmq_vhost = '/' |
|
75
|
|
|
self.rmq_username = self.deployer.data_queue.username |
|
76
|
|
|
self.rmq_password = self.deployer.data_queue.password |
|
77
|
|
|
|
|
78
|
|
|
# Gets assigned when `connect_publisher` is called on the working object |
|
79
|
|
|
self.rmq_publisher = None |
|
80
|
|
|
|
|
81
|
|
|
# Used for buffering/caching processing requests in the Processing Server |
|
82
|
|
|
# Key: `workspace_id` or `path_to_mets` depending on which is provided |
|
83
|
|
|
# Value: Queue that holds PYInputJob elements |
|
84
|
|
|
self.processing_requests_cache = {} |
|
85
|
|
|
|
|
86
|
|
|
# Used by processing workers and/or processor servers to report back the results |
|
87
|
|
|
if self.deployer.internal_callback_url: |
|
88
|
|
|
host = self.deployer.internal_callback_url |
|
89
|
|
|
self.internal_job_callback_url = f'{host.rstrip("/")}/result_callback' |
|
90
|
|
|
else: |
|
91
|
|
|
self.internal_job_callback_url = f'http://{host}:{port}/result_callback' |
|
92
|
|
|
|
|
93
|
|
|
# Create routes |
|
94
|
|
|
self.router.add_api_route( |
|
95
|
|
|
path='/stop', |
|
96
|
|
|
endpoint=self.stop_deployed_agents, |
|
97
|
|
|
methods=['POST'], |
|
98
|
|
|
tags=['tools'], |
|
99
|
|
|
summary='Stop database, queue and processing-workers', |
|
100
|
|
|
) |
|
101
|
|
|
|
|
102
|
|
|
self.router.add_api_route( |
|
103
|
|
|
path='/processor/{processor_name}', |
|
104
|
|
|
endpoint=self.push_processor_job, |
|
105
|
|
|
methods=['POST'], |
|
106
|
|
|
tags=['processing'], |
|
107
|
|
|
status_code=status.HTTP_200_OK, |
|
108
|
|
|
summary='Submit a job to this processor', |
|
109
|
|
|
response_model=PYJobOutput, |
|
110
|
|
|
response_model_exclude_unset=True, |
|
111
|
|
|
response_model_exclude_none=True |
|
112
|
|
|
) |
|
113
|
|
|
|
|
114
|
|
|
self.router.add_api_route( |
|
115
|
|
|
path='/processor/{processor_name}/{job_id}', |
|
116
|
|
|
endpoint=self.get_processor_job, |
|
117
|
|
|
methods=['GET'], |
|
118
|
|
|
tags=['processing'], |
|
119
|
|
|
status_code=status.HTTP_200_OK, |
|
120
|
|
|
summary='Get information about a job based on its ID', |
|
121
|
|
|
response_model=PYJobOutput, |
|
122
|
|
|
response_model_exclude_unset=True, |
|
123
|
|
|
response_model_exclude_none=True |
|
124
|
|
|
) |
|
125
|
|
|
|
|
126
|
|
|
self.router.add_api_route( |
|
127
|
|
|
path='/result_callback', |
|
128
|
|
|
endpoint=self.remove_from_request_cache, |
|
129
|
|
|
methods=['POST'], |
|
130
|
|
|
tags=['processing'], |
|
131
|
|
|
status_code=status.HTTP_200_OK, |
|
132
|
|
|
summary='Callback used by a worker or processor server for reporting result of a processing request', |
|
133
|
|
|
) |
|
134
|
|
|
|
|
135
|
|
|
self.router.add_api_route( |
|
136
|
|
|
path='/processor/{processor_name}', |
|
137
|
|
|
endpoint=self.get_processor_info, |
|
138
|
|
|
methods=['GET'], |
|
139
|
|
|
tags=['processing', 'discovery'], |
|
140
|
|
|
status_code=status.HTTP_200_OK, |
|
141
|
|
|
summary='Get information about this processor', |
|
142
|
|
|
) |
|
143
|
|
|
|
|
144
|
|
|
self.router.add_api_route( |
|
145
|
|
|
path='/processor', |
|
146
|
|
|
endpoint=self.list_processors, |
|
147
|
|
|
methods=['GET'], |
|
148
|
|
|
tags=['processing', 'discovery'], |
|
149
|
|
|
status_code=status.HTTP_200_OK, |
|
150
|
|
|
summary='Get a list of all available processors', |
|
151
|
|
|
) |
|
152
|
|
|
|
|
153
|
|
|
@self.exception_handler(RequestValidationError) |
|
154
|
|
|
async def validation_exception_handler(request: Request, exc: RequestValidationError): |
|
155
|
|
|
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') |
|
156
|
|
|
self.log.error(f'{request}: {exc_str}') |
|
157
|
|
|
content = {'status_code': 10422, 'message': exc_str, 'data': None} |
|
158
|
|
|
return JSONResponse(content=content, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) |
|
159
|
|
|
|
|
160
|
|
|
def start(self) -> None: |
|
161
|
|
|
""" deploy agents (db, queue, workers) and start the processing server with uvicorn |
|
162
|
|
|
""" |
|
163
|
|
|
try: |
|
164
|
|
|
self.deployer.deploy_rabbitmq(image='rabbitmq:3-management', detach=True, remove=True) |
|
165
|
|
|
rabbitmq_url = self.deployer.data_queue.url |
|
166
|
|
|
|
|
167
|
|
|
self.deployer.deploy_mongodb(image='mongo', detach=True, remove=True) |
|
168
|
|
|
self.mongodb_url = self.deployer.data_mongo.url |
|
169
|
|
|
|
|
170
|
|
|
# The RMQPublisher is initialized and a connection to the RabbitMQ is performed |
|
171
|
|
|
self.connect_publisher() |
|
172
|
|
|
self.log.debug(f'Creating message queues on RabbitMQ instance url: {rabbitmq_url}') |
|
173
|
|
|
self.create_message_queues() |
|
174
|
|
|
|
|
175
|
|
|
self.deployer.deploy_hosts( |
|
176
|
|
|
mongodb_url=self.mongodb_url, |
|
177
|
|
|
rabbitmq_url=rabbitmq_url |
|
178
|
|
|
) |
|
179
|
|
|
except Exception: |
|
180
|
|
|
self.log.error('Error during startup of processing server. ' |
|
181
|
|
|
'Trying to kill parts of incompletely deployed service') |
|
182
|
|
|
self.deployer.kill_all() |
|
183
|
|
|
raise |
|
184
|
|
|
uvicorn.run(self, host=self.hostname, port=int(self.port)) |
|
185
|
|
|
|
|
186
|
|
|
async def on_startup(self): |
|
187
|
|
|
await initiate_database(db_url=self.mongodb_url) |
|
188
|
|
|
|
|
189
|
|
|
async def on_shutdown(self) -> None: |
|
190
|
|
|
""" |
|
191
|
|
|
- hosts and pids should be stored somewhere |
|
192
|
|
|
- ensure queue is empty or processor is not currently running |
|
193
|
|
|
- connect to hosts and kill pids |
|
194
|
|
|
""" |
|
195
|
|
|
await self.stop_deployed_agents() |
|
196
|
|
|
|
|
197
|
|
|
async def stop_deployed_agents(self) -> None: |
|
198
|
|
|
self.deployer.kill_all() |
|
199
|
|
|
|
|
200
|
|
|
def connect_publisher(self, enable_acks: bool = True) -> None: |
|
201
|
|
|
self.log.info(f'Connecting RMQPublisher to RabbitMQ server: ' |
|
202
|
|
|
f'{self.rmq_host}:{self.rmq_port}{self.rmq_vhost}') |
|
203
|
|
|
self.rmq_publisher = RMQPublisher( |
|
204
|
|
|
host=self.rmq_host, |
|
205
|
|
|
port=self.rmq_port, |
|
206
|
|
|
vhost=self.rmq_vhost |
|
207
|
|
|
) |
|
208
|
|
|
self.log.debug(f'RMQPublisher authenticates with username: ' |
|
209
|
|
|
f'{self.rmq_username}, password: {self.rmq_password}') |
|
210
|
|
|
self.rmq_publisher.authenticate_and_connect( |
|
211
|
|
|
username=self.rmq_username, |
|
212
|
|
|
password=self.rmq_password |
|
213
|
|
|
) |
|
214
|
|
|
if enable_acks: |
|
215
|
|
|
self.rmq_publisher.enable_delivery_confirmations() |
|
216
|
|
|
self.log.info('Delivery confirmations are enabled') |
|
217
|
|
|
self.log.info('Successfully connected RMQPublisher.') |
|
218
|
|
|
|
|
219
|
|
|
def create_message_queues(self) -> None: |
|
220
|
|
|
""" Create the message queues based on the occurrence of |
|
221
|
|
|
`workers.name` in the config file. |
|
222
|
|
|
""" |
|
223
|
|
|
|
|
224
|
|
|
# TODO: Remove |
|
225
|
|
|
""" |
|
226
|
|
|
queue_names = set([]) |
|
227
|
|
|
for data_host in self.deployer.data_hosts: |
|
228
|
|
|
for data_worker in data_host.data_workers: |
|
229
|
|
|
queue_names.add(data_worker.processor_name) |
|
230
|
|
|
""" |
|
231
|
|
|
|
|
232
|
|
|
# The abstract version of the above lines |
|
233
|
|
|
queue_names = self.deployer.find_matching_processors( |
|
234
|
|
|
worker_only=True, |
|
235
|
|
|
str_names_only=True, |
|
236
|
|
|
unique_only=True |
|
237
|
|
|
) |
|
238
|
|
|
|
|
239
|
|
|
for queue_name in queue_names: |
|
240
|
|
|
# The existence/validity of the worker.name is not tested. |
|
241
|
|
|
# Even if an ocr-d processor does not exist, the queue is created |
|
242
|
|
|
self.log.info(f'Creating a message queue with id: {queue_name}') |
|
243
|
|
|
self.rmq_publisher.create_queue(queue_name=queue_name) |
|
244
|
|
|
|
|
245
|
|
|
@staticmethod |
|
246
|
|
|
def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: |
|
247
|
|
|
processing_message = OcrdProcessingMessage( |
|
248
|
|
|
job_id=job.job_id, |
|
249
|
|
|
processor_name=job.processor_name, |
|
250
|
|
|
created_time=generate_created_time(), |
|
251
|
|
|
path_to_mets=job.path_to_mets, |
|
252
|
|
|
workspace_id=job.workspace_id, |
|
253
|
|
|
input_file_grps=job.input_file_grps, |
|
254
|
|
|
output_file_grps=job.output_file_grps, |
|
255
|
|
|
page_id=job.page_id, |
|
256
|
|
|
parameters=job.parameters, |
|
257
|
|
|
result_queue_name=job.result_queue_name, |
|
258
|
|
|
callback_url=job.callback_url, |
|
259
|
|
|
internal_callback_url=job.internal_callback_url |
|
260
|
|
|
) |
|
261
|
|
|
return processing_message |
|
262
|
|
|
|
|
263
|
|
|
def check_if_queue_exists(self, processor_name): |
|
264
|
|
|
try: |
|
265
|
|
|
# Only checks if the process queue exists, if not raises ChannelClosedByBroker |
|
266
|
|
|
self.rmq_publisher.create_queue(processor_name, passive=True) |
|
267
|
|
|
except ChannelClosedByBroker as error: |
|
268
|
|
|
self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") |
|
269
|
|
|
# Reconnect publisher - not efficient, but works |
|
270
|
|
|
# TODO: Revisit when reconnection strategy is implemented |
|
271
|
|
|
self.connect_publisher(enable_acks=True) |
|
272
|
|
|
raise HTTPException( |
|
273
|
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
274
|
|
|
detail=f"Process queue with id '{processor_name}' not existing" |
|
275
|
|
|
) |
|
276
|
|
|
|
|
277
|
|
|
def check_if_locked_pages_for_output_file_grps( |
|
278
|
|
|
self, |
|
279
|
|
|
locked_ws_pages: Dict, |
|
280
|
|
|
output_file_grps: List[str], |
|
281
|
|
|
page_ids: List[str] |
|
282
|
|
|
) -> bool: |
|
283
|
|
|
for output_fileGrp in output_file_grps: |
|
284
|
|
|
self.log.debug(f"Checking output file group: {output_fileGrp}") |
|
285
|
|
|
if output_fileGrp in locked_ws_pages: |
|
286
|
|
|
self.log.debug(f"Locked workspace pages has entry for output file group: {output_fileGrp}") |
|
287
|
|
|
if "all_pages" in locked_ws_pages[output_fileGrp]: |
|
288
|
|
|
self.log.debug(f"Caching the received request due to locked output file grp pages") |
|
289
|
|
|
return True |
|
290
|
|
|
# If there are request page ids that are already locked |
|
291
|
|
|
if not set(locked_ws_pages[output_fileGrp]).isdisjoint(page_ids): |
|
292
|
|
|
self.log.debug(f"Caching the received request due to locked output file grp pages") |
|
293
|
|
|
return True |
|
294
|
|
|
|
|
295
|
|
|
def lock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): |
|
296
|
|
|
for output_fileGrp in output_file_grps: |
|
297
|
|
|
if output_fileGrp not in locked_ws_pages: |
|
298
|
|
|
self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") |
|
299
|
|
|
locked_ws_pages[output_fileGrp] = [] |
|
300
|
|
|
# The page id list is not empty - only some pages are in the request |
|
301
|
|
|
if page_ids: |
|
302
|
|
|
self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") |
|
303
|
|
|
locked_ws_pages[output_fileGrp].extend(page_ids) |
|
304
|
|
|
else: |
|
305
|
|
|
# Lock all pages with a single value |
|
306
|
|
|
self.log.debug(f"Locking all pages for `{output_fileGrp}`") |
|
307
|
|
|
locked_ws_pages[output_fileGrp].append("all_pages") |
|
308
|
|
|
|
|
309
|
|
|
def unlock_pages(self, locked_ws_pages: Dict, output_file_grps: List[str], page_ids: List[str]): |
|
310
|
|
|
for output_fileGrp in output_file_grps: |
|
311
|
|
|
if output_fileGrp in locked_ws_pages: |
|
312
|
|
|
if page_ids: |
|
313
|
|
|
# Unlock the previously locked pages |
|
314
|
|
|
self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") |
|
315
|
|
|
locked_ws_pages[output_fileGrp] = [x for x in locked_ws_pages[output_fileGrp] if |
|
316
|
|
|
x not in page_ids] |
|
317
|
|
|
self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: {locked_ws_pages[output_fileGrp]}") |
|
318
|
|
|
else: |
|
319
|
|
|
# Remove the single variable used to indicate all pages are locked |
|
320
|
|
|
self.log.debug(f"Unlocking all pages for: {output_fileGrp}") |
|
321
|
|
|
locked_ws_pages[output_fileGrp].remove("all_pages") |
|
322
|
|
|
|
|
323
|
|
|
# Returns true if all dependent jobs' states are success, else false |
|
324
|
|
|
async def check_if_job_dependencies_met(self, dependencies: List[str]) -> bool: |
|
325
|
|
|
# Check the states of all dependent jobs |
|
326
|
|
|
for dependency_job_id in dependencies: |
|
327
|
|
|
self.log.debug(f"dependency_job_id: {dependency_job_id}") |
|
328
|
|
|
try: |
|
329
|
|
|
dependency_job_state = (await db_get_processing_job(dependency_job_id)).state |
|
330
|
|
|
except ValueError: |
|
331
|
|
|
# job_id not (yet) in db. Dependency not met |
|
332
|
|
|
return False |
|
333
|
|
|
self.log.debug(f"dependency_job_state: {dependency_job_state}") |
|
334
|
|
|
# Found a dependent job whose state is not success |
|
335
|
|
|
if dependency_job_state != StateEnum.success: |
|
336
|
|
|
return False |
|
337
|
|
|
return True |
|
338
|
|
|
|
|
339
|
|
|
async def find_next_requests_from_internal_queue(self, internal_queue: List[PYJobInput]) -> List[PYJobInput]: |
|
340
|
|
|
found_requests = [] |
|
341
|
|
|
for i, current_element in enumerate(internal_queue): |
|
342
|
|
|
# Request has other job dependencies |
|
343
|
|
|
if current_element.depends_on: |
|
344
|
|
|
self.log.debug(f"current_element: {current_element}") |
|
345
|
|
|
self.log.debug(f"job dependencies: {current_element.depends_on}") |
|
346
|
|
|
satisfied_dependencies = await self.check_if_job_dependencies_met(current_element.depends_on) |
|
347
|
|
|
self.log.debug(f"satisfied dependencies: {satisfied_dependencies}") |
|
348
|
|
|
if not satisfied_dependencies: |
|
349
|
|
|
continue |
|
350
|
|
|
# Consume the request from the internal queue |
|
351
|
|
|
found_request = internal_queue.pop(i) |
|
352
|
|
|
self.log.debug(f"found cached request to be processed: {found_request}") |
|
353
|
|
|
found_requests.append(found_request) |
|
354
|
|
|
return found_requests |
|
355
|
|
|
|
|
356
|
|
|
def query_ocrd_tool_json_from_server(self, processor_name): |
|
357
|
|
|
processor_server_url = self.deployer.resolve_processor_server_url(processor_name) |
|
358
|
|
|
if not processor_server_url: |
|
359
|
|
|
self.log.exception(f"Processor Server of '{processor_name}' is not available") |
|
360
|
|
|
raise HTTPException( |
|
361
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
362
|
|
|
detail=f"Processor Server of '{processor_name}' is not available" |
|
363
|
|
|
) |
|
364
|
|
|
# Request the tool json from the Processor Server |
|
365
|
|
|
response = requests.get( |
|
366
|
|
|
processor_server_url, |
|
367
|
|
|
headers={'Content-Type': 'application/json'} |
|
368
|
|
|
) |
|
369
|
|
|
if not response.status_code == 200: |
|
370
|
|
|
self.log.exception(f"Failed to retrieve '{processor_name}' from: {processor_server_url}") |
|
371
|
|
|
raise HTTPException( |
|
372
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
373
|
|
|
detail=f"Failed to retrieve '{processor_name}' from: {processor_server_url}" |
|
374
|
|
|
) |
|
375
|
|
|
ocrd_tool = response.json() |
|
376
|
|
|
return ocrd_tool, processor_server_url |
|
377
|
|
|
|
|
378
|
|
|
async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: |
|
379
|
|
|
if data.job_id: |
|
380
|
|
|
raise HTTPException( |
|
381
|
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
382
|
|
|
detail=f"Job id field is set but must not be: {data.job_id}" |
|
383
|
|
|
) |
|
384
|
|
|
# Generate processing job id |
|
385
|
|
|
data.job_id = generate_id() |
|
386
|
|
|
|
|
387
|
|
|
# Append the processor name to the request itself |
|
388
|
|
|
data.processor_name = processor_name |
|
389
|
|
|
|
|
390
|
|
|
if data.agent_type not in ['worker', 'server']: |
|
391
|
|
|
raise HTTPException( |
|
392
|
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
393
|
|
|
detail=f"Unknown network agent with value: {data.agent_type}" |
|
394
|
|
|
) |
|
395
|
|
|
workspace_db = await db_get_workspace( |
|
396
|
|
|
workspace_id=data.workspace_id, |
|
397
|
|
|
workspace_mets_path=data.path_to_mets |
|
398
|
|
|
) |
|
399
|
|
|
if not workspace_db: |
|
400
|
|
|
raise HTTPException( |
|
401
|
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
402
|
|
|
detail=f"Workspace with id: {data.workspace_id} or path: {data.path_to_mets} not found" |
|
403
|
|
|
) |
|
404
|
|
|
|
|
405
|
|
|
# Since the path is not resolved yet, |
|
406
|
|
|
# the return value is not important for the Processing Server |
|
407
|
|
|
await validate_and_return_mets_path(self.log, data) |
|
408
|
|
|
|
|
409
|
|
|
page_ids = expand_page_ids(data.page_id) |
|
410
|
|
|
|
|
411
|
|
|
# A flag whether the current request must be cached |
|
412
|
|
|
# This is set to true if for any output fileGrp there |
|
413
|
|
|
# is a page_id value that has been previously locked |
|
414
|
|
|
cache_current_request = False |
|
415
|
|
|
|
|
416
|
|
|
# Check if there are any dependencies of the current request |
|
417
|
|
|
if data.depends_on: |
|
418
|
|
|
if not await self.check_if_job_dependencies_met(data.depends_on): |
|
419
|
|
|
self.log.debug(f"Caching the received request due to job dependencies") |
|
420
|
|
|
cache_current_request = True |
|
421
|
|
|
|
|
422
|
|
|
locked_ws_pages = workspace_db.pages_locked |
|
423
|
|
|
|
|
424
|
|
|
# No need for further check of locked pages dependency |
|
425
|
|
|
# if the request should be already cached |
|
426
|
|
|
if not cache_current_request: |
|
427
|
|
|
# Check if there are any locked pages for the current request |
|
428
|
|
|
cache_current_request = self.check_if_locked_pages_for_output_file_grps( |
|
429
|
|
|
locked_ws_pages=locked_ws_pages, |
|
430
|
|
|
output_file_grps=data.output_file_grps, |
|
431
|
|
|
page_ids=page_ids |
|
432
|
|
|
) |
|
433
|
|
|
|
|
434
|
|
|
if cache_current_request: |
|
435
|
|
|
workspace_key = data.workspace_id if data.workspace_id else data.path_to_mets |
|
436
|
|
|
# If a record queue of this workspace_id does not exist in the requests cache |
|
437
|
|
|
if not self.processing_requests_cache.get(workspace_key, None): |
|
438
|
|
|
self.log.debug(f"Creating an internal queue for workspace_key: {workspace_key}") |
|
439
|
|
|
self.processing_requests_cache[workspace_key] = [] |
|
440
|
|
|
self.log.debug(f"Caching the processing request: {data}") |
|
441
|
|
|
# Add the processing request to the end of the internal queue |
|
442
|
|
|
self.processing_requests_cache[workspace_key].append(data) |
|
443
|
|
|
|
|
444
|
|
|
return PYJobOutput( |
|
445
|
|
|
job_id=data.job_id, |
|
446
|
|
|
processor_name=processor_name, |
|
447
|
|
|
workspace_id=data.workspace_id, |
|
448
|
|
|
workspace_path=data.path_to_mets, |
|
449
|
|
|
state=StateEnum.cached |
|
450
|
|
|
) |
|
451
|
|
|
else: |
|
452
|
|
|
# Update locked pages by locking the pages in the request |
|
453
|
|
|
self.lock_pages( |
|
454
|
|
|
locked_ws_pages=locked_ws_pages, |
|
455
|
|
|
output_file_grps=data.output_file_grps, |
|
456
|
|
|
page_ids=page_ids |
|
457
|
|
|
) |
|
458
|
|
|
|
|
459
|
|
|
# Update the locked pages dictionary in the database |
|
460
|
|
|
await db_update_workspace( |
|
461
|
|
|
workspace_id=data.workspace_id, |
|
462
|
|
|
workspace_mets_path=data.path_to_mets, |
|
463
|
|
|
pages_locked=locked_ws_pages |
|
464
|
|
|
) |
|
465
|
|
|
|
|
466
|
|
|
# Create a DB entry |
|
467
|
|
|
job = DBProcessorJob( |
|
468
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
|
469
|
|
|
internal_callback_url=self.internal_job_callback_url, |
|
470
|
|
|
state=StateEnum.queued |
|
471
|
|
|
) |
|
472
|
|
|
await job.insert() |
|
473
|
|
|
|
|
474
|
|
|
job_output = None |
|
475
|
|
|
if data.agent_type == 'worker': |
|
476
|
|
|
ocrd_tool = await self.get_processor_info(processor_name) |
|
477
|
|
|
validate_job_input(self.log, processor_name, ocrd_tool, data) |
|
478
|
|
|
processing_message = self.create_processing_message(job) |
|
479
|
|
|
await self.push_to_processing_queue(processor_name, processing_message) |
|
480
|
|
|
job_output = job.to_job_output() |
|
481
|
|
|
if data.agent_type == 'server': |
|
482
|
|
|
ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) |
|
483
|
|
|
validate_job_input(self.log, processor_name, ocrd_tool, data) |
|
484
|
|
|
job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) |
|
485
|
|
|
if not job_output: |
|
486
|
|
|
self.log.exception('Failed to create job output') |
|
487
|
|
|
raise HTTPException( |
|
488
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
489
|
|
|
detail='Failed to create job output' |
|
490
|
|
|
) |
|
491
|
|
|
return job_output |
|
492
|
|
|
|
|
493
|
|
|
# TODO: Revisit and remove duplications between push_to_* methods |
|
494
|
|
|
async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage): |
|
495
|
|
|
if not self.rmq_publisher: |
|
496
|
|
|
raise Exception('RMQPublisher is not connected') |
|
497
|
|
|
deployed_processors = self.deployer.find_matching_processors( |
|
498
|
|
|
worker_only=True, |
|
499
|
|
|
str_names_only=True, |
|
500
|
|
|
unique_only=True |
|
501
|
|
|
) |
|
502
|
|
|
if processor_name not in deployed_processors: |
|
503
|
|
|
self.check_if_queue_exists(processor_name) |
|
504
|
|
|
|
|
505
|
|
|
encoded_processing_message = OcrdProcessingMessage.encode_yml(processing_message) |
|
506
|
|
|
try: |
|
507
|
|
|
self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message) |
|
508
|
|
|
except Exception as error: |
|
509
|
|
|
self.log.exception(f'RMQPublisher has failed: {error}') |
|
510
|
|
|
raise HTTPException( |
|
511
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
512
|
|
|
detail=f'RMQPublisher has failed: {error}' |
|
513
|
|
|
) |
|
514
|
|
|
|
|
515
|
|
|
async def push_to_processor_server( |
|
516
|
|
|
self, |
|
517
|
|
|
processor_name: str, |
|
518
|
|
|
processor_server_url: str, |
|
519
|
|
|
job_input: PYJobInput |
|
520
|
|
|
) -> PYJobOutput: |
|
521
|
|
|
try: |
|
522
|
|
|
json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True)) |
|
523
|
|
|
except Exception as e: |
|
524
|
|
|
self.log.exception(f"Failed to json dump the PYJobInput, error: {e}") |
|
525
|
|
|
raise HTTPException( |
|
526
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
527
|
|
|
detail=f"Failed to json dump the PYJobInput, error: {e}" |
|
528
|
|
|
) |
|
529
|
|
|
|
|
530
|
|
|
# TODO: The amount of pages should come as a request input |
|
531
|
|
|
# TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 |
|
532
|
|
|
# currently, use 200 as a default |
|
533
|
|
|
amount_of_pages = 200 |
|
534
|
|
|
request_timeout = 20.0 * amount_of_pages # 20 sec timeout per page |
|
535
|
|
|
# Post a processing job to the Processor Server asynchronously |
|
536
|
|
|
timeout = httpx.Timeout(timeout=request_timeout, connect=30.0) |
|
537
|
|
|
async with httpx.AsyncClient(timeout=timeout) as client: |
|
538
|
|
|
response = await client.post( |
|
539
|
|
|
processor_server_url, |
|
540
|
|
|
headers={'Content-Type': 'application/json'}, |
|
541
|
|
|
json=json.loads(json_data) |
|
542
|
|
|
) |
|
543
|
|
|
|
|
544
|
|
|
if not response.status_code == 202: |
|
545
|
|
|
self.log.exception(f"Failed to post '{processor_name}' job to: {processor_server_url}") |
|
546
|
|
|
raise HTTPException( |
|
547
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
548
|
|
|
detail=f"Failed to post '{processor_name}' job to: {processor_server_url}" |
|
549
|
|
|
) |
|
550
|
|
|
job_output = response.json() |
|
551
|
|
|
return job_output |
|
552
|
|
|
|
|
553
|
|
|
async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput: |
|
554
|
|
|
return await _get_processor_job(self.log, processor_name, job_id) |
|
555
|
|
|
|
|
556
|
|
|
async def remove_from_request_cache(self, result_message: PYResultMessage): |
|
557
|
|
|
job_id = result_message.job_id |
|
558
|
|
|
state = result_message.state |
|
559
|
|
|
path_to_mets = result_message.path_to_mets |
|
560
|
|
|
workspace_id = result_message.workspace_id |
|
561
|
|
|
|
|
562
|
|
|
self.log.debug(f"Received result for job with id: {job_id} has state: {state}") |
|
563
|
|
|
|
|
564
|
|
|
if state == StateEnum.failed: |
|
565
|
|
|
# TODO: Call the callback to the Workflow server if the current processing step has failed |
|
566
|
|
|
pass |
|
567
|
|
|
|
|
568
|
|
|
if state != StateEnum.success: |
|
569
|
|
|
# TODO: Handle other potential error cases |
|
570
|
|
|
pass |
|
571
|
|
|
|
|
572
|
|
|
job_db = await db_get_processing_job(job_id) |
|
573
|
|
|
if not job_db: |
|
574
|
|
|
self.log.exception(f"Processing job with id: {job_id} not found in DB") |
|
575
|
|
|
job_output_file_grps = job_db.output_file_grps |
|
576
|
|
|
job_page_ids = expand_page_ids(job_db.page_id) |
|
577
|
|
|
|
|
578
|
|
|
# Read DB workspace entry |
|
579
|
|
|
workspace_db = await db_get_workspace( |
|
580
|
|
|
workspace_id=workspace_id, |
|
581
|
|
|
workspace_mets_path=path_to_mets |
|
582
|
|
|
) |
|
583
|
|
|
if not workspace_db: |
|
584
|
|
|
self.log.exception(f"Workspace with id: {workspace_id} or path: {path_to_mets} not found in DB") |
|
585
|
|
|
|
|
586
|
|
|
locked_ws_pages = workspace_db.pages_locked |
|
587
|
|
|
# Update locked pages by unlocking the pages in the request |
|
588
|
|
|
self.unlock_pages( |
|
589
|
|
|
locked_ws_pages=locked_ws_pages, |
|
590
|
|
|
output_file_grps=job_output_file_grps, |
|
591
|
|
|
page_ids=job_page_ids |
|
592
|
|
|
) |
|
593
|
|
|
|
|
594
|
|
|
# Update the locked pages dictionary in the database |
|
595
|
|
|
await db_update_workspace( |
|
596
|
|
|
workspace_id=workspace_id, |
|
597
|
|
|
workspace_mets_path=path_to_mets, |
|
598
|
|
|
pages_locked=locked_ws_pages |
|
599
|
|
|
) |
|
600
|
|
|
|
|
601
|
|
|
# Take the next request from the cache (if any available) |
|
602
|
|
|
workspace_key = workspace_id if workspace_id else path_to_mets |
|
603
|
|
|
|
|
604
|
|
|
if workspace_key not in self.processing_requests_cache: |
|
605
|
|
|
self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") |
|
606
|
|
|
return |
|
607
|
|
|
|
|
608
|
|
|
if not len(self.processing_requests_cache[workspace_key]): |
|
609
|
|
|
# The queue is empty - delete it |
|
610
|
|
|
try: |
|
611
|
|
|
del self.processing_requests_cache[workspace_key] |
|
612
|
|
|
except KeyError: |
|
613
|
|
|
self.log.warning(f"Trying to delete non-existing internal queue with key: {workspace_key}") |
|
614
|
|
|
return |
|
615
|
|
|
|
|
616
|
|
|
consumed_requests = await self.find_next_requests_from_internal_queue( |
|
617
|
|
|
internal_queue=self.processing_requests_cache[workspace_key] |
|
618
|
|
|
) |
|
619
|
|
|
|
|
620
|
|
|
if not len(consumed_requests): |
|
621
|
|
|
self.log.debug("No data was consumed from the internal queue") |
|
622
|
|
|
return |
|
623
|
|
|
|
|
624
|
|
|
for data in consumed_requests: |
|
625
|
|
|
processor_name = data.processor_name |
|
626
|
|
|
# Create a DB entry |
|
627
|
|
|
job = DBProcessorJob( |
|
628
|
|
|
**data.dict(exclude_unset=True, exclude_none=True), |
|
629
|
|
|
internal_callback_url=self.internal_job_callback_url, |
|
630
|
|
|
state=StateEnum.queued |
|
631
|
|
|
) |
|
632
|
|
|
await job.insert() |
|
633
|
|
|
|
|
634
|
|
|
job_output = None |
|
635
|
|
|
if data.agent_type == 'worker': |
|
636
|
|
|
ocrd_tool = await self.get_processor_info(processor_name) |
|
637
|
|
|
validate_job_input(self.log, processor_name, ocrd_tool, data) |
|
638
|
|
|
processing_message = self.create_processing_message(job) |
|
639
|
|
|
await self.push_to_processing_queue(processor_name, processing_message) |
|
640
|
|
|
job_output = job.to_job_output() |
|
641
|
|
|
if data.agent_type == 'server': |
|
642
|
|
|
ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(processor_name) |
|
643
|
|
|
validate_job_input(self.log, processor_name, ocrd_tool, data) |
|
644
|
|
|
job_output = await self.push_to_processor_server(processor_name, processor_server_url, data) |
|
645
|
|
|
if not job_output: |
|
646
|
|
|
self.log.exception(f'Failed to create job output for job input data: {data}') |
|
647
|
|
|
|
|
648
|
|
|
async def get_processor_info(self, processor_name) -> Dict: |
|
649
|
|
|
""" Return a processor's ocrd-tool.json |
|
650
|
|
|
""" |
|
651
|
|
|
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None) |
|
652
|
|
|
if not ocrd_tool: |
|
653
|
|
|
raise HTTPException( |
|
654
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
655
|
|
|
detail=f"Ocrd tool JSON of '{processor_name}' not available!" |
|
656
|
|
|
) |
|
657
|
|
|
|
|
658
|
|
|
# TODO: Returns the ocrd tool json even of processors |
|
659
|
|
|
# that are not deployed. This may or may not be desired. |
|
660
|
|
|
return ocrd_tool |
|
661
|
|
|
|
|
662
|
|
|
async def list_processors(self) -> List[str]: |
|
663
|
|
|
# There is no caching on the Processing Server side |
|
664
|
|
|
processor_names_list = self.deployer.find_matching_processors( |
|
665
|
|
|
docker_only=False, |
|
666
|
|
|
native_only=False, |
|
667
|
|
|
worker_only=False, |
|
668
|
|
|
server_only=False, |
|
669
|
|
|
str_names_only=True, |
|
670
|
|
|
unique_only=True |
|
671
|
|
|
) |
|
672
|
|
|
return processor_names_list |
|
673
|
|
|
|