|
1
|
|
|
from fastapi import HTTPException, status, UploadFile |
|
2
|
|
|
from fastapi.responses import FileResponse |
|
3
|
|
|
from httpx import AsyncClient, Timeout |
|
4
|
|
|
from json import dumps, loads |
|
5
|
|
|
from logging import Logger |
|
6
|
|
|
from pathlib import Path |
|
7
|
|
|
from requests import get as requests_get |
|
8
|
|
|
from typing import Dict, List, Union |
|
9
|
|
|
from urllib.parse import urljoin |
|
10
|
|
|
|
|
11
|
|
|
from ocrd.resolver import Resolver |
|
12
|
|
|
from ocrd.task_sequence import ProcessorTask |
|
13
|
|
|
from ocrd.workspace import Workspace |
|
14
|
|
|
from ocrd_validators import ParameterValidator |
|
15
|
|
|
|
|
16
|
|
|
from .database import ( |
|
17
|
|
|
db_create_workspace, |
|
18
|
|
|
db_get_processing_job, |
|
19
|
|
|
db_get_workflow_job, |
|
20
|
|
|
db_get_workflow_script, |
|
21
|
|
|
db_get_workspace |
|
22
|
|
|
) |
|
23
|
|
|
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput |
|
24
|
|
|
from .rabbitmq_utils import OcrdProcessingMessage |
|
25
|
|
|
from .utils import ( |
|
26
|
|
|
calculate_processing_request_timeout, |
|
27
|
|
|
expand_page_ids, |
|
28
|
|
|
generate_created_time, |
|
29
|
|
|
generate_workflow_content, |
|
30
|
|
|
get_ocrd_workspace_physical_pages |
|
31
|
|
|
) |
|
32
|
|
|
|
|
33
|
|
|
|
|
34
|
|
|
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage: |
|
35
|
|
|
try: |
|
36
|
|
|
processing_message = OcrdProcessingMessage( |
|
37
|
|
|
job_id=job.job_id, |
|
38
|
|
|
processor_name=job.processor_name, |
|
39
|
|
|
created_time=generate_created_time(), |
|
40
|
|
|
path_to_mets=job.path_to_mets, |
|
41
|
|
|
workspace_id=job.workspace_id, |
|
42
|
|
|
input_file_grps=job.input_file_grps, |
|
43
|
|
|
output_file_grps=job.output_file_grps, |
|
44
|
|
|
page_id=job.page_id, |
|
45
|
|
|
parameters=job.parameters, |
|
46
|
|
|
result_queue_name=job.result_queue_name, |
|
47
|
|
|
callback_url=job.callback_url, |
|
48
|
|
|
internal_callback_url=job.internal_callback_url |
|
49
|
|
|
) |
|
50
|
|
|
return processing_message |
|
51
|
|
|
except ValueError as error: |
|
52
|
|
|
message = f"Failed to create OcrdProcessingMessage from DBProcessorJob" |
|
53
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
54
|
|
|
|
|
55
|
|
|
|
|
56
|
|
|
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace: |
|
57
|
|
|
try: |
|
58
|
|
|
# Core cannot create workspaces by API, but the Processing Server needs |
|
59
|
|
|
# the workspace in the database. The workspace is created if the path is |
|
60
|
|
|
# available locally and not existing in the database - since it has not |
|
61
|
|
|
# been uploaded through the Workspace Server. |
|
62
|
|
|
db_workspace = await db_create_workspace(mets_path) |
|
63
|
|
|
return db_workspace |
|
64
|
|
|
except FileNotFoundError as error: |
|
65
|
|
|
message = f"Mets file path not existing: {mets_path}" |
|
66
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
67
|
|
|
|
|
68
|
|
|
|
|
69
|
|
|
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob: |
|
70
|
|
|
try: |
|
71
|
|
|
workflow_job = await db_get_workflow_job(workflow_job_id) |
|
72
|
|
|
return workflow_job |
|
73
|
|
|
except ValueError as error: |
|
74
|
|
|
message = f"Workflow job with id '{workflow_job_id}' not found in the DB." |
|
75
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
76
|
|
|
|
|
77
|
|
|
|
|
78
|
|
|
async def get_from_database_workspace( |
|
79
|
|
|
logger: Logger, |
|
80
|
|
|
workspace_id: str = None, |
|
81
|
|
|
workspace_mets_path: str = None |
|
82
|
|
|
) -> DBWorkspace: |
|
83
|
|
|
try: |
|
84
|
|
|
db_workspace = await db_get_workspace(workspace_id, workspace_mets_path) |
|
85
|
|
|
return db_workspace |
|
86
|
|
|
except ValueError as error: |
|
87
|
|
|
message = f"Workspace with id '{workspace_id}' not found in the DB." |
|
88
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
89
|
|
|
|
|
90
|
|
|
|
|
91
|
|
|
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]: |
|
92
|
|
|
try: |
|
93
|
|
|
if page_id: |
|
94
|
|
|
page_range = expand_page_ids(page_id) |
|
95
|
|
|
else: |
|
96
|
|
|
# If no page_id is specified, all physical pages are assigned as page range |
|
97
|
|
|
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path) |
|
98
|
|
|
return page_range |
|
99
|
|
|
except Exception as error: |
|
100
|
|
|
message = f"Failed to determine page range for mets path: {mets_path}" |
|
101
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
102
|
|
|
|
|
103
|
|
|
|
|
104
|
|
|
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput: |
|
105
|
|
|
""" Return processing job-information from the database |
|
106
|
|
|
""" |
|
107
|
|
|
try: |
|
108
|
|
|
job = await db_get_processing_job(job_id) |
|
109
|
|
|
return job.to_job_output() |
|
110
|
|
|
except ValueError as error: |
|
111
|
|
|
message = f"Processing job with id '{job_id}' not existing." |
|
112
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
113
|
|
|
|
|
114
|
|
|
|
|
115
|
|
|
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse: |
|
116
|
|
|
db_job = await _get_processor_job(logger, job_id) |
|
117
|
|
|
log_file_path = Path(db_job.log_file_path) |
|
118
|
|
|
return FileResponse(path=log_file_path, filename=log_file_path.name) |
|
119
|
|
|
|
|
120
|
|
|
|
|
121
|
|
|
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict: |
|
122
|
|
|
# Request the ocrd tool json from the Processor Server |
|
123
|
|
|
try: |
|
124
|
|
|
response = requests_get( |
|
125
|
|
|
urljoin(base=processor_server_base_url, url="info"), |
|
126
|
|
|
headers={"Content-Type": "application/json"} |
|
127
|
|
|
) |
|
128
|
|
|
except Exception as error: |
|
129
|
|
|
message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" |
|
130
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
131
|
|
|
if response.status_code != 200: |
|
132
|
|
|
message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" |
|
133
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
|
134
|
|
|
return response.json() |
|
135
|
|
|
|
|
136
|
|
|
async def forward_job_to_processor_server( |
|
137
|
|
|
logger: Logger, job_input: PYJobInput, processor_server_base_url: str |
|
138
|
|
|
) -> PYJobOutput: |
|
139
|
|
|
try: |
|
140
|
|
|
json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True)) |
|
141
|
|
|
except Exception as error: |
|
142
|
|
|
message = f"Failed to json dump the PYJobInput: {job_input}" |
|
143
|
|
|
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
|
144
|
|
|
|
|
145
|
|
|
# TODO: The amount of pages should come as a request input |
|
146
|
|
|
# TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 |
|
147
|
|
|
# currently, use 200 as a default |
|
148
|
|
|
request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0) |
|
149
|
|
|
|
|
150
|
|
|
# Post a processing job to the Processor Server asynchronously |
|
151
|
|
|
async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client: |
|
152
|
|
|
response = await client.post( |
|
153
|
|
|
urljoin(base=processor_server_base_url, url="run"), |
|
154
|
|
|
headers={"Content-Type": "application/json"}, |
|
155
|
|
|
json=loads(json_data) |
|
156
|
|
|
) |
|
157
|
|
|
if response.status_code != 202: |
|
158
|
|
|
message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}" |
|
159
|
|
|
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
|
160
|
|
|
job_output = response.json() |
|
161
|
|
|
return job_output |
|
162
|
|
|
|
|
163
|
|
|
|
|
164
|
|
|
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str: |
|
165
|
|
|
if not workflow and not workflow_id: |
|
166
|
|
|
message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing." |
|
167
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
168
|
|
|
if workflow_id: |
|
169
|
|
|
try: |
|
170
|
|
|
db_workflow = await db_get_workflow_script(workflow_id) |
|
171
|
|
|
return db_workflow.content |
|
172
|
|
|
except ValueError as error: |
|
173
|
|
|
message = f"Workflow with id '{workflow_id}' not found" |
|
174
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
175
|
|
|
return await generate_workflow_content(workflow) |
|
176
|
|
|
|
|
177
|
|
|
|
|
178
|
|
|
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str: |
|
179
|
|
|
if job_input.workspace_id: |
|
180
|
|
|
db_workspace = await get_from_database_workspace(logger, job_input.workspace_id) |
|
181
|
|
|
return db_workspace.workspace_mets_path |
|
182
|
|
|
return job_input.path_to_mets |
|
183
|
|
|
|
|
184
|
|
|
|
|
185
|
|
|
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]: |
|
186
|
|
|
try: |
|
187
|
|
|
tasks_list = workflow_content.splitlines() |
|
188
|
|
|
return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
|
189
|
|
|
except ValueError as error: |
|
190
|
|
|
message = f"Failed parsing processing tasks from a workflow." |
|
191
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
192
|
|
|
|
|
193
|
|
|
|
|
194
|
|
|
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None: |
|
195
|
|
|
if error: |
|
196
|
|
|
message = f"{message} {error}" |
|
197
|
|
|
logger.exception(f"{message}") |
|
198
|
|
|
raise HTTPException(status_code=status_code, detail=message) |
|
199
|
|
|
|
|
200
|
|
|
|
|
201
|
|
|
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: |
|
202
|
|
|
# logger.warning(f"Job input: {job_input}") |
|
203
|
|
|
if bool(job_input.path_to_mets) == bool(job_input.workspace_id): |
|
204
|
|
|
message = ( |
|
205
|
|
|
"Wrong processing job input format. " |
|
206
|
|
|
"Either 'path_to_mets' or 'workspace_id' must be provided. " |
|
207
|
|
|
"Both are provided or both are missing." |
|
208
|
|
|
) |
|
209
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
210
|
|
|
if not ocrd_tool: |
|
211
|
|
|
message = f"Failed parsing processing tasks from a workflow." |
|
212
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
|
213
|
|
|
try: |
|
214
|
|
|
report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) |
|
215
|
|
|
except Exception as error: |
|
216
|
|
|
message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" |
|
217
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) |
|
218
|
|
|
if report and not report.is_valid: |
|
219
|
|
|
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" |
|
220
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}") |
|
221
|
|
|
|
|
222
|
|
|
|
|
223
|
|
|
def validate_workflow(logger: Logger, workflow: str) -> None: |
|
224
|
|
|
""" |
|
225
|
|
|
Check whether workflow is not empty and parseable to a lists of ProcessorTask |
|
226
|
|
|
""" |
|
227
|
|
|
if not workflow.strip(): |
|
228
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.") |
|
229
|
|
|
try: |
|
230
|
|
|
tasks_list = workflow.splitlines() |
|
231
|
|
|
[ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
|
232
|
|
|
except ValueError as error: |
|
233
|
|
|
message = "Provided workflow script is invalid, failed to parse ProcessorTasks." |
|
234
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
235
|
|
|
|
|
236
|
|
|
|
|
237
|
|
|
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]): |
|
238
|
|
|
# Validate the input file groups of the first task in the workflow |
|
239
|
|
|
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups |
|
240
|
|
|
for group in input_file_grps: |
|
241
|
|
|
if group not in available_groups: |
|
242
|
|
|
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" |
|
243
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
244
|
|
|
|