Passed
Pull Request — master (#1083)
by Konstantin
02:49
created

ocrd_network.database   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 200
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 50
eloc 143
dl 0
loc 200
rs 8.4
c 0
b 0
f 0

15 Functions

Rating   Name   Duplication   Size   Complexity  
A sync_db_update_processing_job() 0 3 1
C db_update_processing_job() 0 24 9
A db_get_processing_job() 0 6 2
A sync_db_get_processing_job() 0 3 1
A sync_db_update_workspace() 0 3 1
A initiate_database() 0 5 1
A db_get_workflow_job() 0 5 2
A sync_db_get_workflow_job() 0 3 1
A sync_initiate_database() 0 3 1
A db_create_workspace() 0 17 3
F db_update_workspace() 0 43 18
A db_get_processing_jobs() 0 3 1
B db_get_workspace() 0 17 7
A sync_db_get_workspace() 0 3 1
A sync_db_get_processing_jobs() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.database often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with an uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from beanie.operators import In
17
from motor.motor_asyncio import AsyncIOMotorClient
18
from uuid import uuid4
19
from pathlib import Path
20
from typing import List
21
22
from .models import (
23
    DBProcessorJob,
24
    DBWorkflowJob,
25
    DBWorkspace
26
)
27
from .utils import call_sync
28
29
30
async def initiate_database(db_url: str):
31
    client = AsyncIOMotorClient(db_url)
32
    await init_beanie(
33
        database=client.get_default_database(default='ocrd'),
34
        document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace]
35
    )
36
37
38
@call_sync
39
async def sync_initiate_database(db_url: str):
40
    await initiate_database(db_url)
41
42
43
async def db_create_workspace(mets_path: str) -> DBWorkspace:
44
    """ Create a workspace-database entry only from a mets-path
45
    """
46
    if not Path(mets_path).exists():
47
        raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
48
    try:
49
        return await db_get_workspace(workspace_mets_path=mets_path)
50
    except ValueError:
51
        workspace_db = DBWorkspace(
52
            workspace_id=str(uuid4()),
53
            workspace_path=Path(mets_path).parent,
54
            workspace_mets_path=mets_path,
55
            ocrd_identifier="",
56
            bagit_profile_identifier="",
57
        )
58
        await workspace_db.save()
59
        return workspace_db
60
61
62
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
63
    workspace = None
64
    if not workspace_id and not workspace_mets_path:
65
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
66
    if workspace_id:
67
        workspace = await DBWorkspace.find_one(
68
            DBWorkspace.workspace_id == workspace_id
69
        )
70
        if not workspace:
71
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
72
    if workspace_mets_path:
73
        workspace = await DBWorkspace.find_one(
74
            DBWorkspace.workspace_mets_path == workspace_mets_path
75
        )
76
        if not workspace:
77
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
78
    return workspace
79
80
81
@call_sync
82
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
83
    return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)
84
85
86
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
87
    workspace = None
88
    if not workspace_id and not workspace_mets_path:
89
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
90
    if workspace_id:
91
        workspace = await DBWorkspace.find_one(
92
            DBWorkspace.workspace_id == workspace_id
93
        )
94
        if not workspace:
95
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
96
    if workspace_mets_path:
97
        workspace = await DBWorkspace.find_one(
98
            DBWorkspace.workspace_mets_path == workspace_mets_path
99
        )
100
        if not workspace:
101
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
102
103
    job_keys = list(workspace.__dict__.keys())
104
    for key, value in kwargs.items():
105
        if key not in job_keys:
106
            raise ValueError(f'Field "{key}" is not available.')
107
        if key == 'workspace_id':
108
            workspace.workspace_id = value
109
        elif key == 'workspace_mets_path':
110
            workspace.workspace_mets_path = value
111
        elif key == 'ocrd_identifier':
112
            workspace.ocrd_identifier = value
113
        elif key == 'bagit_profile_identifier':
114
            workspace.bagit_profile_identifier = value
115
        elif key == 'ocrd_base_version_checksum':
116
            workspace.ocrd_base_version_checksum = value
117
        elif key == 'ocrd_mets':
118
            workspace.ocrd_mets = value
119
        elif key == 'bag_info_adds':
120
            workspace.bag_info_adds = value
121
        elif key == 'deleted':
122
            workspace.deleted = value
123
        elif key == 'mets_server_url':
124
            workspace.mets_server_url = value
125
        else:
126
            raise ValueError(f'Field "{key}" is not updatable.')
127
    await workspace.save()
128
    return workspace
129
130
131
@call_sync
132
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
133
    return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
134
135
136
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
137
    job = await DBProcessorJob.find_one(
138
        DBProcessorJob.job_id == job_id)
139
    if not job:
140
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
141
    return job
142
143
144
@call_sync
145
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
146
    return await db_get_processing_job(job_id)
147
148
149
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
150
    job = await DBProcessorJob.find_one(
151
        DBProcessorJob.job_id == job_id)
152
    if not job:
153
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
154
155
    job_keys = list(job.__dict__.keys())
156
    for key, value in kwargs.items():
157
        if key not in job_keys:
158
            raise ValueError(f'Field "{key}" is not available.')
159
        if key == 'state':
160
            job.state = value
161
        elif key == 'start_time':
162
            job.start_time = value
163
        elif key == 'end_time':
164
            job.end_time = value
165
        elif key == 'path_to_mets':
166
            job.path_to_mets = value
167
        elif key == 'exec_time':
168
            job.exec_time = value
169
        else:
170
            raise ValueError(f'Field "{key}" is not updatable.')
171
    await job.save()
172
    return job
173
174
175
@call_sync
176
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
177
    return await db_update_processing_job(job_id=job_id, **kwargs)
178
179
180
async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
181
    job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
182
    if not job:
183
        raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
184
    return job
185
186
187
@call_sync
188
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
189
    return await db_get_workflow_job(job_id)
190
191
192
async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
193
    jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
194
    return jobs
195
196
197
@call_sync
198
async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
199
    return await db_get_processing_jobs(job_ids)
200