ocrd_network.server_utils   C
last analyzed

Complexity

Total Complexity 53

Size/Duplication

Total Lines 280
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 53
eloc 212
dl 0
loc 280
rs 6.96
c 0
b 0
f 0

17 Functions

Rating   Name   Duplication   Size   Complexity  
A validate_and_return_mets_path() 0 5 2
A _get_processor_job_log() 0 4 1
A forward_job_to_processor_server() 0 26 4
B validate_job_input() 0 20 6
A request_processor_server_tool_json() 0 14 3
A create_workspace_if_not_exists() 0 11 2
A parse_workflow_tasks() 0 7 2
A validate_first_task_input_file_groups_existence() 0 7 3
A get_workflow_content() 0 12 5
A get_page_ids_list() 0 11 3
A raise_http_exception() 0 5 2
A create_processing_message() 0 20 2
A _get_processor_job() 0 9 2
A get_from_database_workflow_job() 0 7 2
A get_from_database_workspace() 0 11 2
A validate_workflow() 0 12 3
C kill_mets_server_zombies() 0 28 9

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.server_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import re
3
import signal
4
from pathlib import Path
5
from json import dumps, loads
6
from urllib.parse import urljoin
7
from typing import Dict, List, Optional, Union
8
from time import time
9
10
from fastapi import HTTPException, status, UploadFile
11
from fastapi.responses import FileResponse
12
from httpx import AsyncClient, Timeout
13
from logging import Logger
14
from requests import get as requests_get
15
from requests_unixsocket import sys
16
17
from ocrd.resolver import Resolver
18
from ocrd.task_sequence import ProcessorTask
19
from ocrd.workspace import Workspace
20
from ocrd_validators import ParameterValidator
21
22
from .database import (
23
    db_create_workspace,
24
    db_get_processing_job,
25
    db_get_workflow_job,
26
    db_get_workflow_script,
27
    db_get_workspace
28
)
29
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput
30
from .rabbitmq_utils import OcrdProcessingMessage
31
from .utils import (
32
    calculate_processing_request_timeout,
33
    expand_page_ids,
34
    generate_created_time,
35
    generate_workflow_content,
36
    get_ocrd_workspace_physical_pages
37
)
38
39
40
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage:
41
    try:
42
        processing_message = OcrdProcessingMessage(
43
            job_id=job.job_id,
44
            processor_name=job.processor_name,
45
            created_time=generate_created_time(),
46
            path_to_mets=job.path_to_mets,
47
            workspace_id=job.workspace_id,
48
            input_file_grps=job.input_file_grps,
49
            output_file_grps=job.output_file_grps,
50
            page_id=job.page_id,
51
            parameters=job.parameters,
52
            result_queue_name=job.result_queue_name,
53
            callback_url=job.callback_url,
54
            internal_callback_url=job.internal_callback_url
55
        )
56
        return processing_message
57
    except ValueError as error:
58
        message = f"Failed to create OcrdProcessingMessage from DBProcessorJob"
59
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
60
61
62
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace:
63
    try:
64
        # Core cannot create workspaces by API, but the Processing Server needs
65
        # the workspace in the database. The workspace is created if the path is
66
        # available locally and not existing in the database - since it has not
67
        # been uploaded through the Workspace Server.
68
        db_workspace = await db_create_workspace(mets_path)
69
        return db_workspace
70
    except FileNotFoundError as error:
71
        message = f"Mets file path not existing: {mets_path}"
72
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
73
74
75
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob:
76
    try:
77
        workflow_job = await db_get_workflow_job(workflow_job_id)
78
        return workflow_job
79
    except ValueError as error:
80
        message = f"Workflow job with id '{workflow_job_id}' not found in the DB."
81
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
82
83
84
async def get_from_database_workspace(
85
    logger: Logger,
86
    workspace_id: str = None,
87
    workspace_mets_path: str = None
88
) -> DBWorkspace:
89
    try:
90
        db_workspace = await db_get_workspace(workspace_id, workspace_mets_path)
91
        return db_workspace
92
    except ValueError as error:
93
        message = f"Workspace with id '{workspace_id}' not found in the DB."
94
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
95
96
97
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]:
98
    try:
99
        if page_id:
100
            page_range = expand_page_ids(page_id)
101
        else:
102
            # If no page_id is specified, all physical pages are assigned as page range
103
            page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
104
        return page_range
105
    except Exception as error:
106
        message = f"Failed to determine page range for mets path: {mets_path}"
107
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
108
109
110
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput:
111
    """ Return processing job-information from the database
112
    """
113
    try:
114
        job = await db_get_processing_job(job_id)
115
        return job.to_job_output()
116
    except ValueError as error:
117
        message = f"Processing job with id '{job_id}' not existing."
118
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
119
120
121
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse:
122
    db_job = await _get_processor_job(logger, job_id)
123
    log_file_path = Path(db_job.log_file_path)
124
    return FileResponse(path=log_file_path, filename=log_file_path.name)
125
126
127
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict:
128
    # Request the ocrd tool json from the Processor Server
129
    try:
130
        response = requests_get(
131
            urljoin(base=processor_server_base_url, url="info"),
132
            headers={"Content-Type": "application/json"}
133
        )
134
    except Exception as error:
135
        message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}"
136
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
137
    if response.status_code != 200:
138
        message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}"
139
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
140
    return response.json()
141
142
async def forward_job_to_processor_server(
143
    logger: Logger, job_input: PYJobInput, processor_server_base_url: str
144
) -> PYJobOutput:
145
    try:
146
        json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True))
147
    except Exception as error:
148
        message = f"Failed to json dump the PYJobInput: {job_input}"
149
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
150
151
    # TODO: The amount of pages should come as a request input
152
    # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
153
    #  currently, use 200 as a default
154
    request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0)
155
156
    # Post a processing job to the Processor Server asynchronously
157
    async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client:
158
        response = await client.post(
159
            urljoin(base=processor_server_base_url, url="run"),
160
            headers={"Content-Type": "application/json"},
161
            json=loads(json_data)
162
        )
163
    if response.status_code != 202:
164
        message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}"
165
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
166
    job_output = response.json()
167
    return job_output
168
169
170
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str:
171
    if not workflow and not workflow_id:
172
        message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing."
173
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
174
    if workflow_id:
175
        try:
176
            db_workflow = await db_get_workflow_script(workflow_id)
177
            return db_workflow.content
178
        except ValueError as error:
179
            message = f"Workflow with id '{workflow_id}' not found"
180
            raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
181
    return await generate_workflow_content(workflow)
182
183
184
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str:
185
    if job_input.workspace_id:
186
        db_workspace = await get_from_database_workspace(logger, job_input.workspace_id)
187
        return db_workspace.workspace_mets_path
188
    return job_input.path_to_mets
189
190
191
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]:
192
    try:
193
        tasks_list = workflow_content.splitlines()
194
        return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
195
    except ValueError as error:
196
        message = f"Failed parsing processing tasks from a workflow."
197
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
198
199
200
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
201
    if error:
202
        message = f"{message} {error}"
203
    logger.exception(f"{message}")
204
    raise HTTPException(status_code=status_code, detail=message)
205
206
207
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None:
208
    # logger.warning(f"Job input: {job_input}")
209
    if bool(job_input.path_to_mets) == bool(job_input.workspace_id):
210
        message = (
211
            "Wrong processing job input format. "
212
            "Either 'path_to_mets' or 'workspace_id' must be provided. "
213
            "Both are provided or both are missing."
214
        )
215
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
216
    if not ocrd_tool:
217
        message = f"Failed parsing processing tasks from a workflow."
218
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
219
    try:
220
        report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
221
    except Exception as error:
222
        message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
223
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
224
    if report and not report.is_valid:
225
        message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
226
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")
227
228
229
def validate_workflow(logger: Logger, workflow: str) -> None:
230
    """
231
    Check whether workflow is not empty and parseable to a lists of ProcessorTask
232
    """
233
    if not workflow.strip():
234
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.")
235
    try:
236
        tasks_list = workflow.splitlines()
237
        [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
238
    except ValueError as error:
239
        message = "Provided workflow script is invalid, failed to parse ProcessorTasks."
240
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
241
242
243
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]):
244
    # Validate the input file groups of the first task in the workflow
245
    available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
246
    for group in input_file_grps:
247
        if group not in available_groups:
248
            message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
249
            raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
250
251
252
def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]:
253
    if minutes_ago == None:
254
        minutes_ago = 90
255
    if dry_run == None:
256
        dry_run = False
257
258
    now = time()
259
    cmdline_pat = r'.*ocrd workspace -U.*server start $'
260
    ret = []
261
    for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime):
262
        if not procdir.is_dir():
263
            continue
264
        cmdline_file = procdir.joinpath('cmdline')
265
        if not cmdline_file.is_file():
266
            continue
267
        ctime_ago = int((now - procdir.stat().st_ctime) / 60)
268
        if ctime_ago < minutes_ago:
269
            continue
270
        cmdline = cmdline_file.read_text().replace('\x00', ' ')
271
        if re.match(cmdline_pat, cmdline):
272
            pid = int(procdir.name)
273
            ret.append(pid)
274
            print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr)
275
            if dry_run:
276
                print(f'[dry_run is active] kill {pid}')
277
            else:
278
                os.kill(pid, signal.SIGTERM)
279
    return ret
280