1
|
|
|
from fastapi import HTTPException, status, UploadFile |
2
|
|
|
from fastapi.responses import FileResponse |
3
|
|
|
from httpx import AsyncClient, Timeout |
4
|
|
|
from json import dumps, loads |
5
|
|
|
from logging import Logger |
6
|
|
|
from pathlib import Path |
7
|
|
|
from requests import get as requests_get |
8
|
|
|
from typing import Dict, List, Union |
9
|
|
|
from urllib.parse import urljoin |
10
|
|
|
|
11
|
|
|
from ocrd.resolver import Resolver |
12
|
|
|
from ocrd.task_sequence import ProcessorTask |
13
|
|
|
from ocrd.workspace import Workspace |
14
|
|
|
from ocrd_validators import ParameterValidator |
15
|
|
|
|
16
|
|
|
from .database import ( |
17
|
|
|
db_create_workspace, |
18
|
|
|
db_get_processing_job, |
19
|
|
|
db_get_workflow_job, |
20
|
|
|
db_get_workflow_script, |
21
|
|
|
db_get_workspace |
22
|
|
|
) |
23
|
|
|
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput |
24
|
|
|
from .rabbitmq_utils import OcrdProcessingMessage |
25
|
|
|
from .utils import ( |
26
|
|
|
calculate_processing_request_timeout, |
27
|
|
|
expand_page_ids, |
28
|
|
|
generate_created_time, |
29
|
|
|
generate_workflow_content, |
30
|
|
|
get_ocrd_workspace_physical_pages |
31
|
|
|
) |
32
|
|
|
|
33
|
|
|
|
34
|
|
|
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage: |
35
|
|
|
try: |
36
|
|
|
processing_message = OcrdProcessingMessage( |
37
|
|
|
job_id=job.job_id, |
38
|
|
|
processor_name=job.processor_name, |
39
|
|
|
created_time=generate_created_time(), |
40
|
|
|
path_to_mets=job.path_to_mets, |
41
|
|
|
workspace_id=job.workspace_id, |
42
|
|
|
input_file_grps=job.input_file_grps, |
43
|
|
|
output_file_grps=job.output_file_grps, |
44
|
|
|
page_id=job.page_id, |
45
|
|
|
parameters=job.parameters, |
46
|
|
|
result_queue_name=job.result_queue_name, |
47
|
|
|
callback_url=job.callback_url, |
48
|
|
|
internal_callback_url=job.internal_callback_url |
49
|
|
|
) |
50
|
|
|
return processing_message |
51
|
|
|
except ValueError as error: |
52
|
|
|
message = f"Failed to create OcrdProcessingMessage from DBProcessorJob" |
53
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
54
|
|
|
|
55
|
|
|
|
56
|
|
|
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace: |
57
|
|
|
try: |
58
|
|
|
# Core cannot create workspaces by API, but the Processing Server needs |
59
|
|
|
# the workspace in the database. The workspace is created if the path is |
60
|
|
|
# available locally and not existing in the database - since it has not |
61
|
|
|
# been uploaded through the Workspace Server. |
62
|
|
|
db_workspace = await db_create_workspace(mets_path) |
63
|
|
|
return db_workspace |
64
|
|
|
except FileNotFoundError as error: |
65
|
|
|
message = f"Mets file path not existing: {mets_path}" |
66
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
67
|
|
|
|
68
|
|
|
|
69
|
|
|
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob: |
70
|
|
|
try: |
71
|
|
|
workflow_job = await db_get_workflow_job(workflow_job_id) |
72
|
|
|
return workflow_job |
73
|
|
|
except ValueError as error: |
74
|
|
|
message = f"Workflow job with id '{workflow_job_id}' not found in the DB." |
75
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
76
|
|
|
|
77
|
|
|
|
78
|
|
|
async def get_from_database_workspace( |
79
|
|
|
logger: Logger, |
80
|
|
|
workspace_id: str = None, |
81
|
|
|
workspace_mets_path: str = None |
82
|
|
|
) -> DBWorkspace: |
83
|
|
|
try: |
84
|
|
|
db_workspace = await db_get_workspace(workspace_id, workspace_mets_path) |
85
|
|
|
return db_workspace |
86
|
|
|
except ValueError as error: |
87
|
|
|
message = f"Workspace with id '{workspace_id}' not found in the DB." |
88
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
89
|
|
|
|
90
|
|
|
|
91
|
|
|
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]: |
92
|
|
|
try: |
93
|
|
|
if page_id: |
94
|
|
|
page_range = expand_page_ids(page_id) |
95
|
|
|
else: |
96
|
|
|
# If no page_id is specified, all physical pages are assigned as page range |
97
|
|
|
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path) |
98
|
|
|
return page_range |
99
|
|
|
except Exception as error: |
100
|
|
|
message = f"Failed to determine page range for mets path: {mets_path}" |
101
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
102
|
|
|
|
103
|
|
|
|
104
|
|
|
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput: |
105
|
|
|
""" Return processing job-information from the database |
106
|
|
|
""" |
107
|
|
|
try: |
108
|
|
|
job = await db_get_processing_job(job_id) |
109
|
|
|
return job.to_job_output() |
110
|
|
|
except ValueError as error: |
111
|
|
|
message = f"Processing job with id '{job_id}' not existing." |
112
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
113
|
|
|
|
114
|
|
|
|
115
|
|
|
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse: |
116
|
|
|
db_job = await _get_processor_job(logger, job_id) |
117
|
|
|
log_file_path = Path(db_job.log_file_path) |
118
|
|
|
return FileResponse(path=log_file_path, filename=log_file_path.name) |
119
|
|
|
|
120
|
|
|
|
121
|
|
|
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict: |
122
|
|
|
# Request the ocrd tool json from the Processor Server |
123
|
|
|
try: |
124
|
|
|
response = requests_get( |
125
|
|
|
urljoin(base=processor_server_base_url, url="info"), |
126
|
|
|
headers={"Content-Type": "application/json"} |
127
|
|
|
) |
128
|
|
|
except Exception as error: |
129
|
|
|
message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" |
130
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
131
|
|
|
if response.status_code != 200: |
132
|
|
|
message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" |
133
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
134
|
|
|
return response.json() |
135
|
|
|
|
136
|
|
|
async def forward_job_to_processor_server( |
137
|
|
|
logger: Logger, job_input: PYJobInput, processor_server_base_url: str |
138
|
|
|
) -> PYJobOutput: |
139
|
|
|
try: |
140
|
|
|
json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True)) |
141
|
|
|
except Exception as error: |
142
|
|
|
message = f"Failed to json dump the PYJobInput: {job_input}" |
143
|
|
|
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
144
|
|
|
|
145
|
|
|
# TODO: The amount of pages should come as a request input |
146
|
|
|
# TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 |
147
|
|
|
# currently, use 200 as a default |
148
|
|
|
request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0) |
149
|
|
|
|
150
|
|
|
# Post a processing job to the Processor Server asynchronously |
151
|
|
|
async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client: |
152
|
|
|
response = await client.post( |
153
|
|
|
urljoin(base=processor_server_base_url, url="run"), |
154
|
|
|
headers={"Content-Type": "application/json"}, |
155
|
|
|
json=loads(json_data) |
156
|
|
|
) |
157
|
|
|
if response.status_code != 202: |
158
|
|
|
message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}" |
159
|
|
|
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
160
|
|
|
job_output = response.json() |
161
|
|
|
return job_output |
162
|
|
|
|
163
|
|
|
|
164
|
|
|
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str: |
165
|
|
|
if not workflow and not workflow_id: |
166
|
|
|
message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing." |
167
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
168
|
|
|
if workflow_id: |
169
|
|
|
try: |
170
|
|
|
db_workflow = await db_get_workflow_script(workflow_id) |
171
|
|
|
return db_workflow.content |
172
|
|
|
except ValueError as error: |
173
|
|
|
message = f"Workflow with id '{workflow_id}' not found" |
174
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
175
|
|
|
return await generate_workflow_content(workflow) |
176
|
|
|
|
177
|
|
|
|
178
|
|
|
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str: |
179
|
|
|
if job_input.workspace_id: |
180
|
|
|
db_workspace = await get_from_database_workspace(logger, job_input.workspace_id) |
181
|
|
|
return db_workspace.workspace_mets_path |
182
|
|
|
return job_input.path_to_mets |
183
|
|
|
|
184
|
|
|
|
185
|
|
|
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]: |
186
|
|
|
try: |
187
|
|
|
tasks_list = workflow_content.splitlines() |
188
|
|
|
return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
189
|
|
|
except ValueError as error: |
190
|
|
|
message = f"Failed parsing processing tasks from a workflow." |
191
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
192
|
|
|
|
193
|
|
|
|
194
|
|
|
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None: |
195
|
|
|
if error: |
196
|
|
|
message = f"{message} {error}" |
197
|
|
|
logger.exception(f"{message}") |
198
|
|
|
raise HTTPException(status_code=status_code, detail=message) |
199
|
|
|
|
200
|
|
|
|
201
|
|
|
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: |
202
|
|
|
# logger.warning(f"Job input: {job_input}") |
203
|
|
|
if bool(job_input.path_to_mets) == bool(job_input.workspace_id): |
204
|
|
|
message = ( |
205
|
|
|
"Wrong processing job input format. " |
206
|
|
|
"Either 'path_to_mets' or 'workspace_id' must be provided. " |
207
|
|
|
"Both are provided or both are missing." |
208
|
|
|
) |
209
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
210
|
|
|
if not ocrd_tool: |
211
|
|
|
message = f"Failed parsing processing tasks from a workflow." |
212
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
213
|
|
|
try: |
214
|
|
|
report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) |
215
|
|
|
except Exception as error: |
216
|
|
|
message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" |
217
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) |
218
|
|
|
if report and not report.is_valid: |
219
|
|
|
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" |
220
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}") |
221
|
|
|
|
222
|
|
|
|
223
|
|
|
def validate_workflow(logger: Logger, workflow: str) -> None: |
224
|
|
|
""" |
225
|
|
|
Check whether workflow is not empty and parseable to a lists of ProcessorTask |
226
|
|
|
""" |
227
|
|
|
if not workflow.strip(): |
228
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.") |
229
|
|
|
try: |
230
|
|
|
tasks_list = workflow.splitlines() |
231
|
|
|
[ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
232
|
|
|
except ValueError as error: |
233
|
|
|
message = "Provided workflow script is invalid, failed to parse ProcessorTasks." |
234
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
235
|
|
|
|
236
|
|
|
|
237
|
|
|
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]): |
238
|
|
|
# Validate the input file groups of the first task in the workflow |
239
|
|
|
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups |
240
|
|
|
for group in input_file_grps: |
241
|
|
|
if group not in available_groups: |
242
|
|
|
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" |
243
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
244
|
|
|
|