Passed
Pull Request — master (#1338)
by
unknown
02:19
created

ocrd_network.server_utils   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 241
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 48
eloc 186
dl 0
loc 241
rs 8.5599
c 0
b 0
f 0

15 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_processor_job_log() 0 4 1
A create_workspace_if_not_exists() 0 11 2
A get_page_ids_list() 0 11 3
A create_processing_message() 0 20 2
A _get_processor_job() 0 9 2
A get_from_database_workflow_job() 0 7 2
A get_from_database_workspace() 0 11 2
A validate_and_return_mets_path() 0 5 2
C kill_mets_server_zombies() 0 29 9
B validate_job_input() 0 20 6
A parse_workflow_tasks() 0 7 2
A validate_first_task_input_file_groups_existence() 0 7 3
B get_workflow_content() 0 15 7
A raise_http_exception() 0 5 2
A validate_workflow() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.server_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import re
3
import signal
4
from pathlib import Path
5
from json import dumps, loads
6
from urllib.parse import urljoin
7
from typing import Dict, List, Optional, Union
8
from time import time
9
10
from fastapi import HTTPException, status, UploadFile
11
from fastapi.responses import FileResponse
12
from httpx import AsyncClient, Timeout
13
from logging import Logger
14
from requests import get as requests_get
15
from requests_unixsocket import sys
16
17
from ocrd.resolver import Resolver
18
from ocrd.task_sequence import ProcessorTask
19
from ocrd.workspace import Workspace
20
from ocrd_validators import ParameterValidator
21
22
from .database import (
23
    db_create_workspace,
24
    db_get_processing_job,
25
    db_get_workflow_job,
26
    db_get_workflow_script,
27
    db_get_workspace
28
)
29
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput
30
from .rabbitmq_utils import OcrdProcessingMessage
31
from .utils import (
32
    calculate_processing_request_timeout,
33
    expand_page_ids,
34
    generate_created_time,
35
    generate_workflow_content,
36
    get_ocrd_workspace_physical_pages
37
)
38
39
40
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage:
41
    try:
42
        processing_message = OcrdProcessingMessage(
43
            job_id=job.job_id,
44
            processor_name=job.processor_name,
45
            created_time=generate_created_time(),
46
            path_to_mets=job.path_to_mets,
47
            workspace_id=job.workspace_id,
48
            input_file_grps=job.input_file_grps,
49
            output_file_grps=job.output_file_grps,
50
            page_id=job.page_id,
51
            parameters=job.parameters,
52
            result_queue_name=job.result_queue_name,
53
            callback_url=job.callback_url,
54
            internal_callback_url=job.internal_callback_url
55
        )
56
        return processing_message
57
    except ValueError as error:
58
        message = "Failed to create OcrdProcessingMessage from DBProcessorJob"
59
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
60
61
62
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace:
63
    try:
64
        # Core cannot create workspaces by API, but the Processing Server needs
65
        # the workspace in the database. The workspace is created if the path is
66
        # available locally and not existing in the database - since it has not
67
        # been uploaded through the Workspace Server.
68
        db_workspace = await db_create_workspace(mets_path)
69
        return db_workspace
70
    except FileNotFoundError as error:
71
        message = f"Mets file path not existing: {mets_path}"
72
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
73
74
75
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob:
76
    try:
77
        workflow_job = await db_get_workflow_job(workflow_job_id)
78
        return workflow_job
79
    except ValueError as error:
80
        message = f"Workflow job with id '{workflow_job_id}' not found in the DB."
81
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
82
83
84
async def get_from_database_workspace(
85
    logger: Logger,
86
    workspace_id: str = None,
87
    workspace_mets_path: str = None
88
) -> DBWorkspace:
89
    try:
90
        db_workspace = await db_get_workspace(workspace_id, workspace_mets_path)
91
        return db_workspace
92
    except ValueError as error:
93
        message = f"Workspace with id '{workspace_id}' not found in the DB."
94
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
95
96
97
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]:
98
    try:
99
        if page_id:
100
            page_range = expand_page_ids(page_id)
101
        else:
102
            # If no page_id is specified, all physical pages are assigned as page range
103
            page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
104
        return page_range
105
    except Exception as error:
106
        message = f"Failed to determine page range for mets path: {mets_path}"
107
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
108
109
110
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput:
111
    """ Return processing job-information from the database
112
    """
113
    try:
114
        job = await db_get_processing_job(job_id)
115
        return job.to_job_output()
116
    except ValueError as error:
117
        message = f"Processing job with id '{job_id}' not existing."
118
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
119
120
121
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse:
122
    db_job = await _get_processor_job(logger, job_id)
123
    log_file_path = Path(db_job.log_file_path)
124
    return FileResponse(path=log_file_path, filename=log_file_path.name)
125
126
127
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, str, None]) -> str:
128
    if not workflow and not workflow_id:
129
        message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing."
130
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
131
    if workflow_id:
132
        try:
133
            db_workflow = await db_get_workflow_script(workflow_id)
134
            return db_workflow.content
135
        except ValueError as error:
136
            message = f"Workflow with id '{workflow_id}' not found"
137
            raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
138
    if isinstance(workflow, str):
139
        with open(workflow) as wf_file:
140
            return wf_file.read()
141
    return await generate_workflow_content(workflow)
142
143
144
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str:
145
    if job_input.workspace_id:
146
        db_workspace = await get_from_database_workspace(logger, job_input.workspace_id)
147
        return db_workspace.workspace_mets_path
148
    return job_input.path_to_mets
149
150
151
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]:
152
    try:
153
        tasks_list = workflow_content.splitlines()
154
        return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
155
    except ValueError as error:
156
        message = "Failed parsing processing tasks from a workflow."
157
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
158
159
160
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
161
    if error:
162
        message = f"{message} {error}"
163
    logger.exception(message)
164
    raise HTTPException(status_code=status_code, detail=message)
165
166
167
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None:
168
    # logger.warning(f"Job input: {job_input}")
169
    if bool(job_input.path_to_mets) == bool(job_input.workspace_id):
170
        message = (
171
            "Wrong processing job input format. "
172
            "Either 'path_to_mets' or 'workspace_id' must be provided. "
173
            "Both are provided or both are missing."
174
        )
175
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
176
    if not ocrd_tool:
177
        message = "Failed parsing processing tasks from a workflow."
178
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
179
    try:
180
        report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
181
    except Exception as error:
182
        message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
183
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
184
    if report and not report.is_valid:
185
        message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
186
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")
187
188
189
def validate_workflow(logger: Logger, workflow: str) -> None:
190
    """
191
    Check whether workflow is not empty and parseable to a lists of ProcessorTask
192
    """
193
    if not workflow.strip():
194
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.")
195
    try:
196
        tasks_list = workflow.splitlines()
197
        [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
198
    except ValueError as error:
199
        message = "Provided workflow script is invalid, failed to parse ProcessorTasks."
200
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
201
202
203
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]):
204
    # Validate the input file groups of the first task in the workflow
205
    available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
206
    for group in input_file_grps:
207
        if group not in available_groups:
208
            message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
209
            raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
210
211
212
def kill_mets_server_zombies(minutes_ago: Optional[int], dry_run: Optional[bool]) -> List[int]:
213
    if minutes_ago is None:
214
        minutes_ago = 90
215
    if dry_run is None:
216
        dry_run = False
217
218
    now = time()
219
    cmdline_pat = r'.*ocrd workspace -U.*server start $'
220
    ret = []
221
    for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime):
222
        if not procdir.is_dir():
223
            continue
224
        cmdline_file = procdir.joinpath('cmdline')
225
        if not cmdline_file.is_file():
226
            continue
227
        ctime_ago = int((now - procdir.stat().st_ctime) / 60)
228
        if ctime_ago < minutes_ago:
229
            continue
230
        cmdline = cmdline_file.read_text().replace('\x00', ' ')
231
        if re.match(cmdline_pat, cmdline):
232
            pid = int(procdir.name)
233
            ret.append(pid)
234
            print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, '
235
                  f'so killing (cmdline="{cmdline})', file=sys.stderr)
236
            if dry_run:
237
                print(f'[dry_run is active] kill {pid}')
238
            else:
239
                os.kill(pid, signal.SIGTERM)
240
    return ret
241