Passed
Pull Request — master (#1269)
by
unknown
02:42
created

ocrd_network.server_utils   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 245
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 44
eloc 181
dl 0
loc 245
rs 8.8798
c 0
b 0
f 0

16 Functions

Rating   Name   Duplication   Size   Complexity  
A validate_and_return_mets_path() 0 5 2
A _get_processor_job_log() 0 4 1
A forward_job_to_processor_server() 0 26 4
A request_processor_server_tool_json() 0 14 3
A create_workspace_if_not_exists() 0 11 2
A parse_workflow_tasks() 0 7 2
A get_workflow_content() 0 12 5
A get_page_ids_list() 0 11 3
A create_processing_message() 0 20 2
A _get_processor_job() 0 9 2
A get_from_database_workflow_job() 0 7 2
A get_from_database_workspace() 0 11 2
B validate_job_input() 0 20 6
A validate_first_task_input_file_groups_existence() 0 7 3
A raise_http_exception() 0 5 2
A validate_workflow() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.server_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from fastapi import HTTPException, status, UploadFile
2
from fastapi.responses import FileResponse
3
from httpx import AsyncClient, Timeout
4
from json import dumps, loads
5
from logging import Logger
6
from pathlib import Path
7
from requests import get as requests_get
8
from typing import Dict, List, Union
9
from urllib.parse import urljoin
10
11
from ocrd.resolver import Resolver
12
from ocrd.task_sequence import ProcessorTask
13
from ocrd.workspace import Workspace
14
from ocrd_validators import ParameterValidator
15
16
from .database import (
17
    db_create_workspace,
18
    db_get_processing_job,
19
    db_get_workflow_job,
20
    db_get_workflow_script,
21
    db_get_workspace
22
)
23
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, PYJobInput, PYJobOutput
24
from .rabbitmq_utils import OcrdProcessingMessage
25
from .utils import (
26
    calculate_processing_request_timeout,
27
    expand_page_ids,
28
    generate_created_time,
29
    generate_workflow_content,
30
    get_ocrd_workspace_physical_pages
31
)
32
33
34
def create_processing_message(logger: Logger, job: DBProcessorJob) -> OcrdProcessingMessage:
35
    try:
36
        processing_message = OcrdProcessingMessage(
37
            job_id=job.job_id,
38
            processor_name=job.processor_name,
39
            created_time=generate_created_time(),
40
            path_to_mets=job.path_to_mets,
41
            workspace_id=job.workspace_id,
42
            input_file_grps=job.input_file_grps,
43
            output_file_grps=job.output_file_grps,
44
            page_id=job.page_id,
45
            parameters=job.parameters,
46
            result_queue_name=job.result_queue_name,
47
            callback_url=job.callback_url,
48
            internal_callback_url=job.internal_callback_url
49
        )
50
        return processing_message
51
    except ValueError as error:
52
        message = f"Failed to create OcrdProcessingMessage from DBProcessorJob"
53
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
54
55
56
async def create_workspace_if_not_exists(logger: Logger, mets_path: str) -> DBWorkspace:
57
    try:
58
        # Core cannot create workspaces by API, but the Processing Server needs
59
        # the workspace in the database. The workspace is created if the path is
60
        # available locally and not existing in the database - since it has not
61
        # been uploaded through the Workspace Server.
62
        db_workspace = await db_create_workspace(mets_path)
63
        return db_workspace
64
    except FileNotFoundError as error:
65
        message = f"Mets file path not existing: {mets_path}"
66
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
67
68
69
async def get_from_database_workflow_job(logger: Logger, workflow_job_id: str) -> DBWorkflowJob:
70
    try:
71
        workflow_job = await db_get_workflow_job(workflow_job_id)
72
        return workflow_job
73
    except ValueError as error:
74
        message = f"Workflow job with id '{workflow_job_id}' not found in the DB."
75
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
76
77
78
async def get_from_database_workspace(
79
    logger: Logger,
80
    workspace_id: str = None,
81
    workspace_mets_path: str = None
82
) -> DBWorkspace:
83
    try:
84
        db_workspace = await db_get_workspace(workspace_id, workspace_mets_path)
85
        return db_workspace
86
    except ValueError as error:
87
        message = f"Workspace with id '{workspace_id}' not found in the DB."
88
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
89
90
91
def get_page_ids_list(logger: Logger, mets_path: str, page_id: str) -> List[str]:
92
    try:
93
        if page_id:
94
            page_range = expand_page_ids(page_id)
95
        else:
96
            # If no page_id is specified, all physical pages are assigned as page range
97
            page_range = get_ocrd_workspace_physical_pages(mets_path=mets_path)
98
        return page_range
99
    except Exception as error:
100
        message = f"Failed to determine page range for mets path: {mets_path}"
101
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
102
103
104
async def _get_processor_job(logger: Logger, job_id: str) -> PYJobOutput:
105
    """ Return processing job-information from the database
106
    """
107
    try:
108
        job = await db_get_processing_job(job_id)
109
        return job.to_job_output()
110
    except ValueError as error:
111
        message = f"Processing job with id '{job_id}' not existing."
112
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
113
114
115
async def _get_processor_job_log(logger: Logger, job_id: str) -> FileResponse:
116
    db_job = await _get_processor_job(logger, job_id)
117
    log_file_path = Path(db_job.log_file_path)
118
    return FileResponse(path=log_file_path, filename=log_file_path.name)
119
120
121
def request_processor_server_tool_json(logger: Logger, processor_server_base_url: str) -> Dict:
122
    # Request the ocrd tool json from the Processor Server
123
    try:
124
        response = requests_get(
125
            urljoin(base=processor_server_base_url, url="info"),
126
            headers={"Content-Type": "application/json"}
127
        )
128
        if response.status_code != 200:
129
            message = f"Failed to retrieve tool json from: {processor_server_base_url}, code: {response.status_code}"
130
            raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
131
        return response.json()
132
    except Exception as error:
133
        message = f"Failed to retrieve ocrd tool json from: {processor_server_base_url}"
134
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
135
136
137
async def forward_job_to_processor_server(
138
    logger: Logger, job_input: PYJobInput, processor_server_base_url: str
139
) -> PYJobOutput:
140
    try:
141
        json_data = dumps(job_input.dict(exclude_unset=True, exclude_none=True))
142
    except Exception as error:
143
        message = f"Failed to json dump the PYJobInput: {job_input}"
144
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message, error)
145
146
    # TODO: The amount of pages should come as a request input
147
    # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161
148
    #  currently, use 200 as a default
149
    request_timeout = calculate_processing_request_timeout(amount_pages=200, timeout_per_page=20.0)
150
151
    # Post a processing job to the Processor Server asynchronously
152
    async with AsyncClient(timeout=Timeout(timeout=request_timeout, connect=30.0)) as client:
153
        response = await client.post(
154
            urljoin(base=processor_server_base_url, url="run"),
155
            headers={"Content-Type": "application/json"},
156
            json=loads(json_data)
157
        )
158
    if response.status_code != 202:
159
        message = f"Failed to post '{job_input.processor_name}' job to: {processor_server_base_url}"
160
        raise_http_exception(logger, status.HTTP_500_INTERNAL_SERVER_ERROR, message)
161
    job_output = response.json()
162
    return job_output
163
164
165
async def get_workflow_content(logger: Logger, workflow_id: str, workflow: Union[UploadFile, None]) -> str:
166
    if not workflow and not workflow_id:
167
        message = "Either 'workflow' must be uploaded as a file or 'workflow_id' must be provided. Both are missing."
168
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
169
    if workflow_id:
170
        try:
171
            db_workflow = await db_get_workflow_script(workflow_id)
172
            return db_workflow.content
173
        except ValueError as error:
174
            message = f"Workflow with id '{workflow_id}' not found"
175
            raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message, error)
176
    return await generate_workflow_content(workflow)
177
178
179
async def validate_and_return_mets_path(logger: Logger, job_input: PYJobInput) -> str:
180
    if job_input.workspace_id:
181
        db_workspace = await get_from_database_workspace(logger, job_input.workspace_id)
182
        return db_workspace.workspace_mets_path
183
    return job_input.path_to_mets
184
185
186
def parse_workflow_tasks(logger: Logger, workflow_content: str) -> List[ProcessorTask]:
187
    try:
188
        tasks_list = workflow_content.splitlines()
189
        return [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
190
    except ValueError as error:
191
        message = f"Failed parsing processing tasks from a workflow."
192
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
193
194
195
def raise_http_exception(logger: Logger, status_code: int, message: str, error: Exception = None) -> None:
196
    if error:
197
        message = f"{message} {error}"
198
    logger.exception(f"{message}")
199
    raise HTTPException(status_code=status_code, detail=message)
200
201
202
def validate_job_input(logger: Logger, processor_name: str, ocrd_tool: dict, job_input: PYJobInput) -> None:
203
    # logger.warning(f"Job input: {job_input}")
204
    if bool(job_input.path_to_mets) == bool(job_input.workspace_id):
205
        message = (
206
            "Wrong processing job input format. "
207
            "Either 'path_to_mets' or 'workspace_id' must be provided. "
208
            "Both are provided or both are missing."
209
        )
210
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
211
    if not ocrd_tool:
212
        message = f"Failed parsing processing tasks from a workflow."
213
        raise_http_exception(logger, status.HTTP_404_NOT_FOUND, message)
214
    try:
215
        report = ParameterValidator(ocrd_tool).validate(dict(job_input.parameters))
216
    except Exception as error:
217
        message = f"Failed to validate processing job input against the ocrd tool json of processor: {processor_name}"
218
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, message, error)
219
    if report and not report.is_valid:
220
        message = f"Failed to validate processing job input against the tool json of processor: {processor_name}\n"
221
        raise_http_exception(logger, status.HTTP_400_BAD_REQUEST, f"{message}{report.errors}")
222
223
224
def validate_workflow(logger: Logger, workflow: str) -> None:
225
    """
226
    Check whether workflow is not empty and parseable to a lists of ProcessorTask
227
    """
228
    if not workflow.strip():
229
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message="Workflow is invalid, empty.")
230
    try:
231
        tasks_list = workflow.splitlines()
232
        [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
233
    except ValueError as error:
234
        message = "Provided workflow script is invalid, failed to parse ProcessorTasks."
235
        raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message, error)
236
237
238
def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: str, input_file_grps: List[str]):
239
    # Validate the input file groups of the first task in the workflow
240
    available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups
241
    for group in input_file_grps:
242
        if group not in available_groups:
243
            message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
244
            raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)
245