|
1
|
|
|
import os |
|
2
|
|
|
import re |
|
3
|
|
|
import signal |
|
4
|
|
|
from pathlib import Path |
|
5
|
|
|
from json import dumps, loads |
|
6
|
|
|
from urllib.parse import urljoin |
|
7
|
|
|
from typing import Dict, List, Optional, Union |
|
8
|
|
|
from time import time |
|
9
|
|
|
|
|
10
|
|
|
from fastapi import HTTPException, status, UploadFile |
|
11
|
|
|
from fastapi.responses import FileResponse |
|
12
|
|
|
from httpx import AsyncClient, Timeout |
|
13
|
|
|
from logging import Logger |
|
14
|
|
|
from requests import get as requests_get |
|
15
|
|
|
from requests_unixsocket import sys |
|
16
|
|
|
|
|
17
|
|
|
from ocrd.resolver import Resolver |
|
18
|
|
|
from ocrd.task_sequence import ProcessorTask |
|
19
|
|
|
from ocrd.workspace import Workspace |
|
20
|
|
|
from ocrd_validators import ParameterValidator |
|
21
|
|
|
|
|
22
|
|
|
from .database import ( |
|
23
|
|
|
db_create_workspace, |
|
24
|
|
|
db_get_processing_job, |
|
25
|
|
|
db_get_workflow_job, |
|
26
|
|
|
db_get_workflow_script, |
|
27
|
|
|
db_get_workspace |
|
28
|
|
|
) |
|
29
|
|
|
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput |
|
30
|
|
|
from .rabbitmq_utils import OcrdProcessingMessage |
|
31
|
|
|
from .utils import ( |
|
32
|
|
|
calculate_processing_request_timeout, |
|
33
|
|
|
expand_page_ids, |
|
34
|
|
|
generate_created_time, |
|
35
|
|
|
generate_workflow_content, |
|
36
|
|
|
get_ocrd_workspace_physical_pages |
|
37
|
|
|
) |
|
38
|
|
|
|
|
39
|
|
|
|
|
40
|
|
|
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage: |
|
41
|
|
|
try: |
|
42
|
|
|
processing_message = OcrdProcessingMessage( |
|
43
|
|
|
job_id=job.job_id, |
|
44
|
|
|
processor_name=job.processor_name, |
|
45
|
|
|
created_time=generate_created_time(), |
|
46
|
|
|
path_to_mets=job.path_to_mets, |
|
47
|
|
|
workspace_id=job.workspace_id, |
|
48
|
|
|
input_file_grps=job.input_file_grps, |
|
49
|
|
|
output_file_grps=job.output_file_grps, |
|
50
|
|
|
page_id=job.page_id, |
|
51
|
|
|
parameters=job.parameters, |
|
52
|
|
|
result_queue_name=job.result_queue_name, |
|
53
|
|
|
callback_url=job.callback_url, |
|
54
|
|
|
internal_callback_url=job.internal_callback_url |
|
55
|
|
|
) |
|
56
|
|
|
return processing_message |
|
57
|
|
|
except ValueError as error: |
|
58
|
|
|
message = "Failed to create OcrdProcessingMessage from DBProcessorJob" |
|
59
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
60
|
|
|
|
|
61
|
|
|
|
|
62
|
|
|
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace: |
|
63
|
|
|
try: |
|
64
|
|
|
# Core cannot create workspaces by API, but the Processing Server needs |
|
65
|
|
|
# the workspace in the database. The workspace is created if the path is |
|
66
|
|
|
# available locally and not existing in the database - since it has not |
|
67
|
|
|
# been uploaded through the Workspace Server. |
|
68
|
|
|
db_workspace = await db_create_workspace(mets_path) |
|
69
|
|
|
return db_workspace |
|
70
|
|
|
except FileNotFoundError as error: |
|
71
|
|
|
message = f"Mets file path not existing: {mets_path}" |
|
72
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
73
|
|
|
|
|
74
|
|
|
|
|
75
|
|
|
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob: |
|
76
|
|
|
try: |
|
77
|
|
|
workflow_job = await db_get_workflow_job(workflow_job_id) |
|
78
|
|
|
return workflow_job |
|
79
|
|
|
except ValueError as error: |
|
80
|
|
|
message = f"Workflow job with id '{workflow_job_id}' not found in the DB." |
|
81
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
82
|
|
|
|
|
83
|
|
|
|
|
84
|
|
|
async def get_from_database_workspace( |
|
85
|
|
|
logger: Logger, |
|
86
|
|
|
workspace_id: str = None, |
|
87
|
|
|
workspace_mets_path: str = None |
|
88
|
|
|
) -> DBWorkspace: |
|
89
|
|
|
try: |
|
90
|
|
|
db_workspace = await db_get_workspace(workspace_id, workspace_mets_path) |
|
91
|
|
|
return db_workspace |
|
92
|
|
|
except ValueError as error: |
|
93
|
|
|
message = f"Workspace with id '{workspace_id}' not found in the DB." |
|
94
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
95
|
|
|
|
|
96
|
|
|
|
|
97
|
|
|
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]: |
|
98
|
|
|
try: |
|
99
|
|
|
if page_id: |
|
100
|
|
|
page_range = expand_page_ids(page_id) |
|
101
|
|
|
else: |
|
102
|
|
|
# If no page_id is specified, all physical pages are assigned as page range |
|
103
|
|
|
page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path) |
|
104
|
|
|
return page_range |
|
105
|
|
|
except Exception as error: |
|
106
|
|
|
message = f"Failed to determine page range for mets path: {mets_path}" |
|
107
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
108
|
|
|
|
|
109
|
|
|
|
|
110
|
|
|
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput: |
|
111
|
|
|
""" Return processing job-information from the database |
|
112
|
|
|
""" |
|
113
|
|
|
try: |
|
114
|
|
|
job = await db_get_processing_job(job_id) |
|
115
|
|
|
return job.to_job_output() |
|
116
|
|
|
except ValueError as error: |
|
117
|
|
|
message = f"Processing job with id '{job_id}' not existing." |
|
118
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
119
|
|
|
|
|
120
|
|
|
|
|
121
|
|
|
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse: |
|
122
|
|
|
db_job = await _get_processor_job(logger, job_id) |
|
123
|
|
|
log_file_path = Path(db_job.log_file_path) |
|
124
|
|
|
return FileResponse(path=log_file_path, filename=log_file_path.name) |
|
125
|
|
|
|
|
126
|
|
|
|
|
127
|
|
|
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, str, None]) -> str: |
|
128
|
|
|
if not workflow and not workflow_id: |
|
129
|
|
|
message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing." |
|
130
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
131
|
|
|
if workflow_id: |
|
132
|
|
|
try: |
|
133
|
|
|
db_workflow = await db_get_workflow_script(workflow_id) |
|
134
|
|
|
return db_workflow.content |
|
135
|
|
|
except ValueError as error: |
|
136
|
|
|
message = f"Workflow with id '{workflow_id}' not found" |
|
137
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error) |
|
138
|
|
|
if isinstance(workflow, str): |
|
139
|
|
|
with open(workflow) as wf_file: |
|
140
|
|
|
return wf_file.read() |
|
141
|
|
|
return await generate_workflow_content(workflow) |
|
142
|
|
|
|
|
143
|
|
|
|
|
144
|
|
|
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str: |
|
145
|
|
|
if job_input.workspace_id: |
|
146
|
|
|
db_workspace = await get_from_database_workspace(logger, job_input.workspace_id) |
|
147
|
|
|
return db_workspace.workspace_mets_path |
|
148
|
|
|
return job_input.path_to_mets |
|
149
|
|
|
|
|
150
|
|
|
|
|
151
|
|
|
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]: |
|
152
|
|
|
try: |
|
153
|
|
|
tasks_list = workflow_content.splitlines() |
|
154
|
|
|
return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
|
155
|
|
|
except ValueError as error: |
|
156
|
|
|
message = "Failed parsing processing tasks from a workflow." |
|
157
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
158
|
|
|
|
|
159
|
|
|
|
|
160
|
|
|
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None: |
|
161
|
|
|
if error: |
|
162
|
|
|
message = f"{message} {error}" |
|
163
|
|
|
logger.exception(message) |
|
164
|
|
|
raise HTTPException(status_code=status_code, detail=message) |
|
165
|
|
|
|
|
166
|
|
|
|
|
167
|
|
|
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None: |
|
168
|
|
|
# logger.warning(f"Job input: {job_input}") |
|
169
|
|
|
if bool(job_input.path_to_mets) == bool(job_input.workspace_id): |
|
170
|
|
|
message = ( |
|
171
|
|
|
"Wrong processing job input format. " |
|
172
|
|
|
"Either 'path_to_mets' or 'workspace_id' must be provided. " |
|
173
|
|
|
"Both are provided or both are missing." |
|
174
|
|
|
) |
|
175
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
176
|
|
|
if not ocrd_tool: |
|
177
|
|
|
message = "Failed parsing processing tasks from a workflow." |
|
178
|
|
|
raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message) |
|
179
|
|
|
try: |
|
180
|
|
|
report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters)) |
|
181
|
|
|
except Exception as error: |
|
182
|
|
|
message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}" |
|
183
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error) |
|
184
|
|
|
if report and not report.is_valid: |
|
185
|
|
|
message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n" |
|
186
|
|
|
raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}") |
|
187
|
|
|
|
|
188
|
|
|
|
|
189
|
|
|
def validate_workflow(logger: Logger, workflow: str) -> None: |
|
190
|
|
|
""" |
|
191
|
|
|
Check whether workflow is not empty and parseable to a lists of ProcessorTask |
|
192
|
|
|
""" |
|
193
|
|
|
if not workflow.strip(): |
|
194
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.") |
|
195
|
|
|
try: |
|
196
|
|
|
tasks_list = workflow.splitlines() |
|
197
|
|
|
[ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] |
|
198
|
|
|
except ValueError as error: |
|
199
|
|
|
message = "Provided workflow script is invalid, failed to parse ProcessorTasks." |
|
200
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error) |
|
201
|
|
|
|
|
202
|
|
|
|
|
203
|
|
|
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]): |
|
204
|
|
|
# Validate the input file groups of the first task in the workflow |
|
205
|
|
|
available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups |
|
206
|
|
|
for group in input_file_grps: |
|
207
|
|
|
if group not in available_groups: |
|
208
|
|
|
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" |
|
209
|
|
|
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) |
|
210
|
|
|
|
|
211
|
|
|
|
|
212
|
|
|
def kill_mets_server_zombies(minutes_ago: Optional[int], dry_run: Optional[bool]) -> List[int]: |
|
213
|
|
|
if minutes_ago is None: |
|
214
|
|
|
minutes_ago = 90 |
|
215
|
|
|
if dry_run is None: |
|
216
|
|
|
dry_run = False |
|
217
|
|
|
|
|
218
|
|
|
now = time() |
|
219
|
|
|
cmdline_pat = r'.*ocrd workspace -U.*server start $' |
|
220
|
|
|
ret = [] |
|
221
|
|
|
for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): |
|
222
|
|
|
if not procdir.is_dir(): |
|
223
|
|
|
continue |
|
224
|
|
|
cmdline_file = procdir.joinpath('cmdline') |
|
225
|
|
|
if not cmdline_file.is_file(): |
|
226
|
|
|
continue |
|
227
|
|
|
ctime_ago = int((now - procdir.stat().st_ctime) / 60) |
|
228
|
|
|
if ctime_ago < minutes_ago: |
|
229
|
|
|
continue |
|
230
|
|
|
cmdline = cmdline_file.read_text().replace('\x00', ' ') |
|
231
|
|
|
if re.match(cmdline_pat, cmdline): |
|
232
|
|
|
pid = int(procdir.name) |
|
233
|
|
|
ret.append(pid) |
|
234
|
|
|
print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, ' |
|
235
|
|
|
f'so killing (cmdline="{cmdline})', file=sys.stderr) |
|
236
|
|
|
if dry_run: |
|
237
|
|
|
print(f'[dry_run is active] kill {pid}') |
|
238
|
|
|
else: |
|
239
|
|
|
os.kill(pid, signal.SIGTERM) |
|
240
|
|
|
return ret |
|
241
|
|
|
|