Passed
Push — master ( 18ecf8...407c02 )
by Konstantin
03:05
created

ocrd_network.server_utils   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 244
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 44
eloc 181
dl 0
loc 244
rs 8.8798
c 0
b 0
f 0

16 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_processor_job_log() 0 4 1
A request_processor_server_tool_json() 0 14 3
A create_workspace_if_not_exists() 0 11 2
A get_page_ids_list() 0 11 3
A create_processing_message() 0 20 2
A _get_processor_job() 0 9 2
A get_from_database_workflow_job() 0 7 2
A get_from_database_workspace() 0 11 2
A validate_and_return_mets_path() 0 5 2
A forward_job_to_processor_server() 0 26 4
B validate_job_input() 0 20 6
A parse_workflow_tasks() 0 7 2
A validate_first_task_input_file_groups_existence() 0 7 3
A get_workflow_content() 0 12 5
A raise_http_exception() 0 5 2
A validate_workflow() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.server_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from fastapi import HTTPException, status, UploadFile
2
from fastapi.responses import FileResponse
3
from httpx import AsyncClient, Timeout
4
from json import dumps, loads
5
from logging import Logger
6
from pathlib import Path
7
from requests import get as requests_get
8
from typing import Dict, List, Union
9
from urllib.parse import urljoin
10
11
from ocrd.resolver import Resolver
12
from ocrd.task_sequence import ProcessorTask
13
from ocrd.workspace import Workspace
14
from ocrd_validators import ParameterValidator
15
16
from .database import (
17
    db_create_workspace,
18
    db_get_processing_job,
19
    db_get_workflow_job,
20
    db_get_workflow_script,
21
    db_get_workspace
22
)
23
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput
24
from .rabbitmq_utils import OcrdProcessingMessage
25
from .utils import (
26
    calculate_processing_request_timeout,
27
    expand_page_ids,
28
    generate_created_time,
29
    generate_workflow_content,
30
    get_ocrd_workspace_physical_pages
31
)
32
33
34
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage:
35
    try:
36
        processing_message = OcrdProcessingMessage(
37
            job_id=job.job_id,
38
            processor_name=job.processor_name,
39
            created_time=generate_created_time(),
40
            path_to_mets=job.path_to_mets,
41
            workspace_id=job.workspace_id,
42
            input_file_grps=job.input_file_grps,
43
            output_file_grps=job.output_file_grps,
44
            page_id=job.page_id,
45
            parameters=job.parameters,
46
            result_queue_name=job.result_queue_name,
47
            callback_url=job.callback_url,
48
            internal_callback_url=job.internal_callback_url
49
        )
50
        return processing_message
51
    except ValueError as error:
52
        message = f"Failed to create OcrdProcessingMessage from DBProcessorJob"
53
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
54
55
56
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace:
57
    try:
58
        # Core cannot create workspaces by API, but the Processing Server needs
59
        # the workspace in the database. The workspace is created if the path is
60
        # available locally and not existing in the database - since it has not
61
        # been uploaded through the Workspace Server.
62
        db_workspace = await db_create_workspace(mets_path)
63
        return db_workspace
64
    except FileNotFoundError as error:
65
        message = f"Mets file path not existing: {mets_path}"
66
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
67
68
69
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob:
70
    try:
71
        workflow_job = await db_get_workflow_job(workflow_job_id)
72
        return workflow_job
73
    except ValueError as error:
74
        message = f"Workflow job with id '{workflow_job_id}' not found in the DB."
75
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
76
77
78
async def get_from_database_workspace(
79
    logger: Logger,
80
    workspace_id: str = None,
81
    workspace_mets_path: str = None
82
) -> DBWorkspace:
83
    try:
84
        db_workspace = await db_get_workspace(workspace_id, workspace_mets_path)
85
        return db_workspace
86
    except ValueError as error:
87
        message = f"Workspace with id '{workspace_id}' not found in the DB."
88
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
89
90
91
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]:
92
    try:
93
        if page_id:
94
            page_range = expand_page_ids(page_id)
95
        else:
96
            # If no page_id is specified, all physical pages are assigned as page range
97
            page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
98
        return page_range
99
    except Exception as error:
100
        message = f"Failed to determine page range for mets path: {mets_path}"
101
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
102
103
104
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput:
105
    """ Return processing job-information from the database
106
    """
107
    try:
108
        job = await db_get_processing_job(job_id)
109
        return job.to_job_output()
110
    except ValueError as error:
111
        message = f"Processing job with id '{job_id}' not existing."
112
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
113
114
115
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse:
116
    db_job = await _get_processor_job(logger, job_id)
117
    log_file_path = Path(db_job.log_file_path)
118
    return FileResponse(path=log_file_path, filename=log_file_path.name)
119
120
121
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict:
122
    # Request the ocrd tool json from the Processor Server
123
    try:
124
        response = requests_get(
125
            urljoin(base=processor_server_base_url, url="info"),
126
            headers={"Content-Type": "application/json"}
127
        )
128
    except Exception as error:
129
        message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}"
130
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
131
    if response.status_code != 200:
132
        message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}"
133
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
134
    return response.json()
135
136
async def forward_job_to_processor_server(
137
    logger: Logger, job_input: PYJobInput, processor_server_base_url: str
138
) -> PYJobOutput:
139
    try:
140
        json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True))
141
    except Exception as error:
142
        message = f"Failed to json dump the PYJobInput: {job_input}"
143
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
144
145
    # TODO: The amount of pages should come as a request input
146
    # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
147
    #  currently, use 200 as a default
148
    request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0)
149
150
    # Post a processing job to the Processor Server asynchronously
151
    async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client:
152
        response = await client.post(
153
            urljoin(base=processor_server_base_url, url="run"),
154
            headers={"Content-Type": "application/json"},
155
            json=loads(json_data)
156
        )
157
    if response.status_code != 202:
158
        message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}"
159
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
160
    job_output = response.json()
161
    return job_output
162
163
164
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str:
165
    if not workflow and not workflow_id:
166
        message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing."
167
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
168
    if workflow_id:
169
        try:
170
            db_workflow = await db_get_workflow_script(workflow_id)
171
            return db_workflow.content
172
        except ValueError as error:
173
            message = f"Workflow with id '{workflow_id}' not found"
174
            raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
175
    return await generate_workflow_content(workflow)
176
177
178
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str:
179
    if job_input.workspace_id:
180
        db_workspace = await get_from_database_workspace(logger, job_input.workspace_id)
181
        return db_workspace.workspace_mets_path
182
    return job_input.path_to_mets
183
184
185
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]:
186
    try:
187
        tasks_list = workflow_content.splitlines()
188
        return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
189
    except ValueError as error:
190
        message = f"Failed parsing processing tasks from a workflow."
191
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
192
193
194
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
195
    if error:
196
        message = f"{message} {error}"
197
    logger.exception(f"{message}")
198
    raise HTTPException(status_code=status_code, detail=message)
199
200
201
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None:
202
    # logger.warning(f"Job input: {job_input}")
203
    if bool(job_input.path_to_mets) == bool(job_input.workspace_id):
204
        message = (
205
            "Wrong processing job input format. "
206
            "Either 'path_to_mets' or 'workspace_id' must be provided. "
207
            "Both are provided or both are missing."
208
        )
209
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
210
    if not ocrd_tool:
211
        message = f"Failed parsing processing tasks from a workflow."
212
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
213
    try:
214
        report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
215
    except Exception as error:
216
        message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
217
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
218
    if report and not report.is_valid:
219
        message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
220
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")
221
222
223
def validate_workflow(logger: Logger, workflow: str) -> None:
224
    """
225
    Check whether workflow is not empty and parseable to a lists of ProcessorTask
226
    """
227
    if not workflow.strip():
228
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.")
229
    try:
230
        tasks_list = workflow.splitlines()
231
        [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
232
    except ValueError as error:
233
        message = "Provided workflow script is invalid, failed to parse ProcessorTasks."
234
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
235
236
237
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]):
238
    # Validate the input file groups of the first task in the workflow
239
    available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
240
    for group in input_file_grps:
241
        if group not in available_groups:
242
            message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
243
            raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
244