1
|
|
|
import os |
2
|
|
|
import re |
3
|
|
|
import signal |
4
|
|
|
from pathlib import Path |
5
|
|
|
from json import dumps, loads |
6
|
|
|
from urllib.parse import urljoin |
7
|
|
|
from typing import Dict, List, Optional, Union |
8
|
|
|
from time import time |
9
|
|
|
|
10
|
|
|
from fastapi import HTTPException, status, UploadFile |
11
|
|
|
from fastapi.responses import FileResponse |
12
|
|
|
from httpx import AsyncClient, Timeout |
13
|
|
|
from logging import Logger |
14
|
|
|
from requests import get as requests_get |
15
|
|
|
from requests_unixsocket import sys |
16
|
|
|
|
17
|
|
|
from ocrd.resolver import Resolver |
18
|
|
|
from ocrd.task_sequence import ProcessorTask |
19
|
|
|
from ocrd.workspace import Workspace |
20
|
|
|
from ocrd_validators import ParameterValidator |
21
|
|
|
|
22
|
|
|
from .database import ( |
23
|
|
|
db_create_workspace, |
24
|
|
|
db_get_processing_job, |
25
|
|
|
db_get_workflow_job, |
26
|
|
|
db_get_workflow_script, |
27
|
|
|
db_get_workspace |
28
|
|
|
) |
29
|
|
|
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput |
30
|
|
|
from .rabbitmq_utils import OcrdProcessingMessage |
31
|
|
|
from .utils import ( |
32
|
|
|
calculate_processing_request_timeout, |
33
|
|
|
expand_page_ids, |
34
|
|
|
generate_created_time, |
35
|
|
|
generate_workflow_content, |
36
|
|
|
get_ocrd_workspace_physical_pages |
37
|
|
|
) |
38
|
|
|
|
39
|
|
|
|
40
|
|
|
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage: |
41
|
|
|
try: |
42
|
|
|
processing_message = OcrdProcessingMessage( |
43
|
|
|
job_id=job.job_id, |
44
|
|
|
processor_name=job.processor_name, |
45
|
|
|
created_time=generate_created_time(), |
46
|
|
|
path_to_mets=job.path_to_mets, |
47
|
|
|
workspace_id=job.workspace_id, |
48
|
|
|
input_file_grps=job.input_file_grps, |
49
|
|
|
output_file_grps=job.output_file_grps, |
50
|
|
|
page_id=job.page_id, |
51
|
|
|
parameters=job.parameters, |
52
|
|
|
result_queue_name=job.result_queue_name, |
53
|
|
|
callback_url=job.callback_url, |
54
|
|
|
internal_callback_url=job.internal_callback_url |
55
|
|
|
) |
56
|
|
|
return processing_message |
57
|
|
|
except ValueError as error: |
58
|
|
|
message = f"Failed to create OcrdProcessingMessage from DBProcessorJob" |
59
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
60
|
|
|
|
61
|
|
|
|
62
|
|
|
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace: |
63
|
|
|
try: |
64
|
|
|
# Core cannot create workspaces by API, but the Processing Server needs |
65
|
|
|
# the workspace in the database. The workspace is created if the path is |
66
|
|
|
# available locally and not existing in the database - since it has not |
67
|
|
|
# been uploaded through the Workspace Server. |
68
|
|
|
db_workspace = await db_create_workspace(mets_path) |
69
|
|
|
return db_workspace |
70
|
|
|
except FileNotFoundError as error: |
71
|
|
|
message = f"Mets file path not existing: {mets_path}" |
72
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
73
|
|
|
|
74
|
|
|
|
75
|
|
|
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob: |
76
|
|
|
try: |
77
|
|
|
workflow_job = await db_get_workflow_job(workflow_job_id) |
78
|
|
|
return workflow_job |
79
|
|
|
except ValueError as error: |
80
|
|
|
message = f"Workflow job with id '{workflow_job_id}' not found in the DB." |
81
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
82
|
|
|
|
83
|
|
|
|
84
|
|
|
async def get_from_database_workspace( |
85
|
|
|
logger: Logger, |
86
|
|
|
workspace_id: str = None, |
87
|
|
|
workspace_mets_path: str = None |
88
|
|
|
) -> DBWorkspace: |
89
|
|
|
try: |
90
|
|
|
db_workspace = await db_get_workspace(workspace_id, workspace_mets_path) |
91
|
|
|
return db_workspace |
92
|
|
|
except ValueError as error: |
93
|
|
|
message = f"Workspace with id '{workspace_id}' not found in the DB." |
94
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
95
|
|
|
|
96
|
|
|
|
97
|
|
|
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]: |
98
|
|
|
try: |
99
|
|
|
if page_id: |
100
|
|
|
page_range = expand_page_ids(page_id) |
101
|
|
|
else: |
102
|
|
|
# If no page_id is specified, all physical pages are assigned as page range |
103
|
|
|
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path) |
104
|
|
|
return page_range |
105
|
|
|
except Exception as error: |
106
|
|
|
message = f"Failed to determine page range for mets path: {mets_path}" |
107
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
108
|
|
|
|
109
|
|
|
|
110
|
|
|
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput: |
111
|
|
|
""" Return processing job-information from the database |
112
|
|
|
""" |
113
|
|
|
try: |
114
|
|
|
job = await db_get_processing_job(job_id) |
115
|
|
|
return job.to_job_output() |
116
|
|
|
except ValueError as error: |
117
|
|
|
message = f"Processing job with id '{job_id}' not existing." |
118
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
119
|
|
|
|
120
|
|
|
|
121
|
|
|
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse: |
122
|
|
|
db_job = await _get_processor_job(logger, job_id) |
123
|
|
|
log_file_path = Path(db_job.log_file_path) |
124
|
|
|
return FileResponse(path=log_file_path, filename=log_file_path.name) |
125
|
|
|
|
126
|
|
|
|
127
|
|
|
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict: |
128
|
|
|
# Request the ocrd tool json from the Processor Server |
129
|
|
|
try: |
130
|
|
|
response = requests_get( |
131
|
|
|
urljoin(base=processor_server_base_url, url="info"), |
132
|
|
|
headers={"Content-Type": "application/json"} |
133
|
|
|
) |
134
|
|
|
except Exception as error: |
135
|
|
|
message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}" |
136
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
137
|
|
|
if response.status_code != 200: |
138
|
|
|
message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}" |
139
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
140
|
|
|
return response.json() |
141
|
|
|
|
142
|
|
|
async def forward_job_to_processor_server( |
143
|
|
|
logger: Logger, job_input: PYJobInput, processor_server_base_url: str |
144
|
|
|
) -> PYJobOutput: |
145
|
|
|
try: |
146
|
|
|
json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True)) |
147
|
|
|
except Exception as error: |
148
|
|
|
message = f"Failed to json dump the PYJobInput: {job_input}" |
149
|
|
|
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error) |
150
|
|
|
|
151
|
|
|
# TODO: The amount of pages should come as a request input |
152
|
|
|
# TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 |
153
|
|
|
# currently, use 200 as a default |
154
|
|
|
request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0) |
155
|
|
|
|
156
|
|
|
# Post a processing job to the Processor Server asynchronously |
157
|
|
|
async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client: |
158
|
|
|
response = await client.post( |
159
|
|
|
urljoin(base=processor_server_base_url, url="run"), |
160
|
|
|
headers={"Content-Type": "application/json"}, |
161
|
|
|
json=loads(json_data) |
162
|
|
|
) |
163
|
|
|
if response.status_code != 202: |
164
|
|
|
message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}" |
165
|
|
|
raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message) |
166
|
|
|
job_output = response.json() |
167
|
|
|
return job_output |
168
|
|
|
|
169
|
|
|
|
170
|
|
|
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str: |
171
|
|
|
if not workflow and not workflow_id: |
172
|
|
|
message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing." |
173
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
174
|
|
|
if workflow_id: |
175
|
|
|
try: |
176
|
|
|
db_workflow = await db_get_workflow_script(workflow_id) |
177
|
|
|
return db_workflow.content |
178
|
|
|
except ValueError as error: |
179
|
|
|
message = f"Workflow with id '{workflow_id}' not found" |
180
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
181
|
|
|
return await generate_workflow_content(workflow) |
182
|
|
|
|
183
|
|
|
|
184
|
|
|
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str: |
185
|
|
|
if job_input.workspace_id: |
186
|
|
|
db_workspace = await get_from_database_workspace(logger, job_input.workspace_id) |
187
|
|
|
return db_workspace.workspace_mets_path |
188
|
|
|
return job_input.path_to_mets |
189
|
|
|
|
190
|
|
|
|
191
|
|
|
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]: |
192
|
|
|
try: |
193
|
|
|
tasks_list = workflow_content.splitlines() |
194
|
|
|
return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
195
|
|
|
except ValueError as error: |
196
|
|
|
message = f"Failed parsing processing tasks from a workflow." |
197
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
198
|
|
|
|
199
|
|
|
|
200
|
|
|
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None: |
201
|
|
|
if error: |
202
|
|
|
message = f"{message} {error}" |
203
|
|
|
logger.exception(f"{message}") |
204
|
|
|
raise HTTPException(status_code=status_code, detail=message) |
205
|
|
|
|
206
|
|
|
|
207
|
|
|
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: |
208
|
|
|
# logger.warning(f"Job input: {job_input}") |
209
|
|
|
if bool(job_input.path_to_mets) == bool(job_input.workspace_id): |
210
|
|
|
message = ( |
211
|
|
|
"Wrong processing job input format. " |
212
|
|
|
"Either 'path_to_mets' or 'workspace_id' must be provided. " |
213
|
|
|
"Both are provided or both are missing." |
214
|
|
|
) |
215
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
216
|
|
|
if not ocrd_tool: |
217
|
|
|
message = f"Failed parsing processing tasks from a workflow." |
218
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
219
|
|
|
try: |
220
|
|
|
report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) |
221
|
|
|
except Exception as error: |
222
|
|
|
message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" |
223
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) |
224
|
|
|
if report and not report.is_valid: |
225
|
|
|
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" |
226
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}") |
227
|
|
|
|
228
|
|
|
|
229
|
|
|
def validate_workflow(logger: Logger, workflow: str) -> None: |
230
|
|
|
""" |
231
|
|
|
Check whether workflow is not empty and parseable to a lists of ProcessorTask |
232
|
|
|
""" |
233
|
|
|
if not workflow.strip(): |
234
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.") |
235
|
|
|
try: |
236
|
|
|
tasks_list = workflow.splitlines() |
237
|
|
|
[ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
238
|
|
|
except ValueError as error: |
239
|
|
|
message = "Provided workflow script is invalid, failed to parse ProcessorTasks." |
240
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
241
|
|
|
|
242
|
|
|
|
243
|
|
|
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]): |
244
|
|
|
# Validate the input file groups of the first task in the workflow |
245
|
|
|
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups |
246
|
|
|
for group in input_file_grps: |
247
|
|
|
if group not in available_groups: |
248
|
|
|
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" |
249
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
250
|
|
|
|
251
|
|
|
|
252
|
|
|
def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]: |
253
|
|
|
if minutes_ago == None: |
254
|
|
|
minutes_ago = 90 |
255
|
|
|
if dry_run == None: |
256
|
|
|
dry_run = False |
257
|
|
|
|
258
|
|
|
now = time() |
259
|
|
|
cmdline_pat = r'.*ocrd workspace -U.*server start $' |
260
|
|
|
ret = [] |
261
|
|
|
for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): |
262
|
|
|
if not procdir.is_dir(): |
263
|
|
|
continue |
264
|
|
|
cmdline_file = procdir.joinpath('cmdline') |
265
|
|
|
if not cmdline_file.is_file(): |
266
|
|
|
continue |
267
|
|
|
ctime_ago = int((now - procdir.stat().st_ctime) / 60) |
268
|
|
|
if ctime_ago < minutes_ago: |
269
|
|
|
continue |
270
|
|
|
cmdline = cmdline_file.read_text().replace('\x00', ' ') |
271
|
|
|
if re.match(cmdline_pat, cmdline): |
272
|
|
|
pid = int(procdir.name) |
273
|
|
|
ret.append(pid) |
274
|
|
|
print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) |
275
|
|
|
if dry_run: |
276
|
|
|
print(f'[dry_run is active] kill {pid}') |
277
|
|
|
else: |
278
|
|
|
os.kill(pid, signal.SIGTERM) |
279
|
|
|
return ret |
280
|
|
|
|