ocrd_network.server_utils   C
last analyzed

Complexity

Total Complexity 55

Size/Duplication

Total Lines 285
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 55
eloc 216
dl 0
loc 285
rs 6
c 0
b 0
f 0

17 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_processor_job_log() 0 4 1
A request_processor_server_tool_json() 0 14 3
A create_workspace_if_not_exists() 0 11 2
A get_page_ids_list() 0 11 3
A create_processing_message() 0 20 2
A _get_processor_job() 0 9 2
A get_from_database_workflow_job() 0 7 2
A get_from_database_workspace() 0 11 2
A forward_job_to_processor_server() 0 26 4
C kill_mets_server_zombies() 0 29 9
A validate_and_return_mets_path() 0 5 2
B validate_job_input() 0 20 6
A parse_workflow_tasks() 0 7 2
A validate_first_task_input_file_groups_existence() 0 7 3
B get_workflow_content() 0 15 7
A raise_http_exception() 0 5 2
A validate_workflow() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.server_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import re
3
import signal
4
from pathlib import Path
5
from json import dumps, loads
6
from urllib.parse import urljoin
7
from typing import Dict, List, Optional, Union
8
from time import time
9
10
from fastapi import HTTPException, status, UploadFile
11
from fastapi.responses import FileResponse
12
from httpx import AsyncClient, Timeout
13
from logging import Logger
14
from requests import get as requests_get
15
from requests_unixsocket import sys
16
17
from ocrd.resolver import Resolver
18
from ocrd.task_sequence import ProcessorTask
19
from ocrd.workspace import Workspace
20
from ocrd_validators import ParameterValidator
21
22
from .database import (
23
    db_create_workspace,
24
    db_get_processing_job,
25
    db_get_workflow_job,
26
    db_get_workflow_script,
27
    db_get_workspace
28
)
29
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput
30
from .rabbitmq_utils import OcrdProcessingMessage
31
from .utils import (
32
    calculate_processing_request_timeout,
33
    expand_page_ids,
34
    generate_created_time,
35
    generate_workflow_content,
36
    get_ocrd_workspace_physical_pages
37
)
38
39
40
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage:
41
    try:
42
        processing_message = OcrdProcessingMessage(
43
            job_id=job.job_id,
44
            processor_name=job.processor_name,
45
            created_time=generate_created_time(),
46
            path_to_mets=job.path_to_mets,
47
            workspace_id=job.workspace_id,
48
            input_file_grps=job.input_file_grps,
49
            output_file_grps=job.output_file_grps,
50
            page_id=job.page_id,
51
            parameters=job.parameters,
52
            result_queue_name=job.result_queue_name,
53
            callback_url=job.callback_url,
54
            internal_callback_url=job.internal_callback_url
55
        )
56
        return processing_message
57
    except ValueError as error:
58
        message = "Failed to create OcrdProcessingMessage from DBProcessorJob"
59
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
60
61
62
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace:
63
    try:
64
        # Core cannot create workspaces by API, but the Processing Server needs
65
        # the workspace in the database. The workspace is created if the path is
66
        # available locally and not existing in the database - since it has not
67
        # been uploaded through the Workspace Server.
68
        db_workspace = await db_create_workspace(mets_path)
69
        return db_workspace
70
    except FileNotFoundError as error:
71
        message = f"Mets file path not existing: {mets_path}"
72
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
73
74
75
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob:
76
    try:
77
        workflow_job = await db_get_workflow_job(workflow_job_id)
78
        return workflow_job
79
    except ValueError as error:
80
        message = f"Workflow job with id '{workflow_job_id}' not found in the DB."
81
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
82
83
84
async def get_from_database_workspace(
85
    logger: Logger,
86
    workspace_id: str = None,
87
    workspace_mets_path: str = None
88
) -> DBWorkspace:
89
    try:
90
        db_workspace = await db_get_workspace(workspace_id, workspace_mets_path)
91
        return db_workspace
92
    except ValueError as error:
93
        message = f"Workspace with id '{workspace_id}' not found in the DB."
94
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
95
96
97
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]:
98
    try:
99
        if page_id:
100
            page_range = expand_page_ids(page_id)
101
        else:
102
            # If no page_id is specified, all physical pages are assigned as page range
103
            page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
104
        return page_range
105
    except Exception as error:
106
        message = f"Failed to determine page range for mets path: {mets_path}"
107
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
108
109
110
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput:
111
    """ Return processing job-information from the database
112
    """
113
    try:
114
        job = await db_get_processing_job(job_id)
115
        return job.to_job_output()
116
    except ValueError as error:
117
        message = f"Processing job with id '{job_id}' not existing."
118
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
119
120
121
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse:
122
    db_job = await _get_processor_job(logger, job_id)
123
    log_file_path = Path(db_job.log_file_path)
124
    return FileResponse(path=log_file_path, filename=log_file_path.name)
125
126
127
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict:
128
    # Request the ocrd tool json from the Processor Server
129
    try:
130
        response = requests_get(
131
            urljoin(base=processor_server_base_url, url="info"),
132
            headers={"Content-Type": "application/json"}
133
        )
134
    except Exception as error:
135
        message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}"
136
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
137
    if response.status_code != 200:
138
        message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}"
139
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
140
    return response.json()
141
142
143
async def forward_job_to_processor_server(
144
    logger: Logger, job_input: PYJobInput, processor_server_base_url: str
145
) -> PYJobOutput:
146
    try:
147
        json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True))
148
    except Exception as error:
149
        message = f"Failed to json dump the PYJobInput: {job_input}"
150
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
151
152
    # TODO: The amount of pages should come as a request input
153
    # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
154
    #  currently, use 200 as a default
155
    request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0)
156
157
    # Post a processing job to the Processor Server asynchronously
158
    async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client:
159
        response = await client.post(
160
            urljoin(base=processor_server_base_url, url="run"),
161
            headers={"Content-Type": "application/json"},
162
            json=loads(json_data)
163
        )
164
    if response.status_code != 202:
165
        message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}"
166
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
167
    job_output = response.json()
168
    return job_output
169
170
171
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, str, None]) -> str:
172
    if not workflow and not workflow_id:
173
        message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing."
174
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
175
    if workflow_id:
176
        try:
177
            db_workflow = await db_get_workflow_script(workflow_id)
178
            return db_workflow.content
179
        except ValueError as error:
180
            message = f"Workflow with id '{workflow_id}' not found"
181
            raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
182
    if isinstance(workflow, str):
183
        with open(workflow) as wf_file:
184
            return wf_file.read()
185
    return await generate_workflow_content(workflow)
186
187
188
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str:
189
    if job_input.workspace_id:
190
        db_workspace = await get_from_database_workspace(logger, job_input.workspace_id)
191
        return db_workspace.workspace_mets_path
192
    return job_input.path_to_mets
193
194
195
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]:
196
    try:
197
        tasks_list = workflow_content.splitlines()
198
        return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
199
    except ValueError as error:
200
        message = "Failed parsing processing tasks from a workflow."
201
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
202
203
204
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
205
    if error:
206
        message = f"{message} {error}"
207
    logger.exception(message)
208
    raise HTTPException(status_code=status_code, detail=message)
209
210
211
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None:
212
    # logger.warning(f"Job input: {job_input}")
213
    if bool(job_input.path_to_mets) == bool(job_input.workspace_id):
214
        message = (
215
            "Wrong processing job input format. "
216
            "Either 'path_to_mets' or 'workspace_id' must be provided. "
217
            "Both are provided or both are missing."
218
        )
219
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
220
    if not ocrd_tool:
221
        message = "Failed parsing processing tasks from a workflow."
222
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
223
    try:
224
        report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
225
    except Exception as error:
226
        message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
227
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
228
    if report and not report.is_valid:
229
        message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
230
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")
231
232
233
def validate_workflow(logger: Logger, workflow: str) -> None:
234
    """
235
    Check whether workflow is not empty and parseable to a lists of ProcessorTask
236
    """
237
    if not workflow.strip():
238
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.")
239
    try:
240
        tasks_list = workflow.splitlines()
241
        [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
242
    except ValueError as error:
243
        message = "Provided workflow script is invalid, failed to parse ProcessorTasks."
244
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
245
246
247
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]):
248
    # Validate the input file groups of the first task in the workflow
249
    available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
250
    for group in input_file_grps:
251
        if group not in available_groups:
252
            message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
253
            raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
254
255
256
def kill_mets_server_zombies(minutes_ago: Optional[int], dry_run: Optional[bool]) -> List[int]:
257
    if minutes_ago is None:
258
        minutes_ago = 90
259
    if dry_run is None:
260
        dry_run = False
261
262
    now = time()
263
    cmdline_pat = r'.*ocrd workspace -U.*server start $'
264
    ret = []
265
    for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime):
266
        if not procdir.is_dir():
267
            continue
268
        cmdline_file = procdir.joinpath('cmdline')
269
        if not cmdline_file.is_file():
270
            continue
271
        ctime_ago = int((now - procdir.stat().st_ctime) / 60)
272
        if ctime_ago < minutes_ago:
273
            continue
274
        cmdline = cmdline_file.read_text().replace('\x00', ' ')
275
        if re.match(cmdline_pat, cmdline):
276
            pid = int(procdir.name)
277
            ret.append(pid)
278
            print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, '
279
                  f'so killing (cmdline="{cmdline})', file=sys.stderr)
280
            if dry_run:
281
                print(f'[dry_run is active] kill {pid}')
282
            else:
283
                os.kill(pid, signal.SIGTERM)
284
    return ret
285