Passed
Pull Request — master (#1164)
by
unknown
03:47
created

ocrd_network.database   F

Complexity

Total Complexity 64

Size/Duplication

Total Lines 260
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 64
eloc 180
dl 0
loc 260
rs 3.28
c 0
b 0
f 0

26 Functions

Rating   Name   Duplication   Size   Complexity  
A db_create_workspace() 0 17 3
A initiate_database() 0 5 1
A db_get_workflow_job() 0 5 2
A sync_db_update_processing_job() 0 3 1
A db_create_workflow_script() 0 2 1
A sync_db_get_workflow_script() 0 3 1
A sync_db_create_processing_job() 0 3 1
A sync_db_create_workflow_script() 0 3 1
A sync_db_create_workflow_job() 0 3 1
A db_get_workflow_script() 0 5 2
A sync_db_get_workflow_job() 0 3 1
A sync_initiate_database() 0 3 1
F db_update_workspace() 0 43 18
C db_update_processing_job() 0 26 10
A db_create_processing_job() 0 2 1
A db_get_processing_jobs() 0 3 1
A db_find_first_workflow_script_by_content() 0 5 2
B db_get_workspace() 0 17 7
A db_create_workflow_job() 0 2 1
A db_get_processing_job() 0 6 2
A sync_db_get_workspace() 0 3 1
A sync_db_find_first_workflow_script_by_content() 0 3 1
A sync_db_get_processing_job() 0 3 1
A sync_db_get_processing_jobs() 0 3 1
A sync_db_create_workspace() 0 3 1
A sync_db_update_workspace() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.database often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with an uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from beanie.operators import In
17
from motor.motor_asyncio import AsyncIOMotorClient
18
from uuid import uuid4
19
from pathlib import Path
20
from typing import List
21
22
from .models import (
23
    DBProcessorJob,
24
    DBWorkflowJob,
25
    DBWorkspace,
26
    DBWorkflowScript,
27
)
28
from .utils import call_sync
29
30
31
async def initiate_database(db_url: str, db_name: str = 'ocrd'):
32
    client = AsyncIOMotorClient(db_url)
33
    await init_beanie(
34
        database=client.get_default_database(default=db_name),
35
        document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript]
36
    )
37
38
39
@call_sync
40
async def sync_initiate_database(db_url: str, db_name: str = 'ocrd'):
41
    await initiate_database(db_url, db_name)
42
43
44
async def db_create_workspace(mets_path: str) -> DBWorkspace:
45
    """ Create a workspace-database entry only from a mets-path
46
    """
47
    if not Path(mets_path).exists():
48
        raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
49
    try:
50
        return await db_get_workspace(workspace_mets_path=mets_path)
51
    except ValueError:
52
        workspace_db = DBWorkspace(
53
            workspace_id=str(uuid4()),
54
            workspace_path=Path(mets_path).parent,
55
            workspace_mets_path=mets_path,
56
            ocrd_identifier="",
57
            bagit_profile_identifier="",
58
        )
59
        await workspace_db.save()
60
        return workspace_db
61
62
63
@call_sync
64
async def sync_db_create_workspace(mets_path: str) -> DBWorkspace:
65
    return await db_create_workspace(mets_path=mets_path)
66
67
68
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
69
    workspace = None
70
    if not workspace_id and not workspace_mets_path:
71
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
72
    if workspace_id:
73
        workspace = await DBWorkspace.find_one(
74
            DBWorkspace.workspace_id == workspace_id
75
        )
76
        if not workspace:
77
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
78
    if workspace_mets_path:
79
        workspace = await DBWorkspace.find_one(
80
            DBWorkspace.workspace_mets_path == workspace_mets_path
81
        )
82
        if not workspace:
83
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
84
    return workspace
85
86
87
@call_sync
88
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
89
    return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)
90
91
92
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
93
    workspace = None
94
    if not workspace_id and not workspace_mets_path:
95
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
96
    if workspace_id:
97
        workspace = await DBWorkspace.find_one(
98
            DBWorkspace.workspace_id == workspace_id
99
        )
100
        if not workspace:
101
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
102
    if workspace_mets_path:
103
        workspace = await DBWorkspace.find_one(
104
            DBWorkspace.workspace_mets_path == workspace_mets_path
105
        )
106
        if not workspace:
107
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
108
109
    job_keys = list(workspace.__dict__.keys())
110
    for key, value in kwargs.items():
111
        if key not in job_keys:
112
            raise ValueError(f'Field "{key}" is not available.')
113
        if key == 'workspace_id':
114
            workspace.workspace_id = value
115
        elif key == 'workspace_mets_path':
116
            workspace.workspace_mets_path = value
117
        elif key == 'ocrd_identifier':
118
            workspace.ocrd_identifier = value
119
        elif key == 'bagit_profile_identifier':
120
            workspace.bagit_profile_identifier = value
121
        elif key == 'ocrd_base_version_checksum':
122
            workspace.ocrd_base_version_checksum = value
123
        elif key == 'ocrd_mets':
124
            workspace.ocrd_mets = value
125
        elif key == 'bag_info_adds':
126
            workspace.bag_info_adds = value
127
        elif key == 'deleted':
128
            workspace.deleted = value
129
        elif key == 'mets_server_url':
130
            workspace.mets_server_url = value
131
        else:
132
            raise ValueError(f'Field "{key}" is not updatable.')
133
    await workspace.save()
134
    return workspace
135
136
137
@call_sync
138
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
139
    return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
140
141
142
async def db_create_processing_job(db_processing_job: DBProcessorJob) -> DBProcessorJob:
143
    return await db_processing_job.insert()
144
145
146
@call_sync
147
async def sync_db_create_processing_job(db_processing_job: DBProcessorJob) -> DBProcessorJob:
148
    return await db_create_processing_job(db_processing_job=db_processing_job)
149
150
151
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
152
    job = await DBProcessorJob.find_one(
153
        DBProcessorJob.job_id == job_id)
154
    if not job:
155
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
156
    return job
157
158
159
@call_sync
160
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
161
    return await db_get_processing_job(job_id)
162
163
164
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
165
    job = await DBProcessorJob.find_one(
166
        DBProcessorJob.job_id == job_id)
167
    if not job:
168
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
169
170
    job_keys = list(job.__dict__.keys())
171
    for key, value in kwargs.items():
172
        if key not in job_keys:
173
            raise ValueError(f'Field "{key}" is not available.')
174
        if key == 'state':
175
            job.state = value
176
        elif key == 'start_time':
177
            job.start_time = value
178
        elif key == 'end_time':
179
            job.end_time = value
180
        elif key == 'path_to_mets':
181
            job.path_to_mets = value
182
        elif key == 'exec_time':
183
            job.exec_time = value
184
        elif key == 'log_file_path':
185
            job.log_file_path = value
186
        else:
187
            raise ValueError(f'Field "{key}" is not updatable.')
188
    await job.save()
189
    return job
190
191
192
@call_sync
193
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
194
    return await db_update_processing_job(job_id=job_id, **kwargs)
195
196
197
async def db_create_workflow_job(db_workflow_job: DBWorkflowJob) -> DBWorkflowJob:
198
    return await db_workflow_job.insert()
199
200
201
@call_sync
202
async def sync_db_create_workflow_job(db_workflow_job: DBWorkflowJob) -> DBWorkflowJob:
203
    return await db_create_workflow_job(db_workflow_job=db_workflow_job)
204
205
206
async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
207
    job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
208
    if not job:
209
        raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
210
    return job
211
212
213
@call_sync
214
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
215
    return await db_get_workflow_job(job_id)
216
217
218
async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
219
    jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
220
    return jobs
221
222
223
@call_sync
224
async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
225
    return await db_get_processing_jobs(job_ids)
226
227
228
async def db_create_workflow_script(db_workflow_script: DBWorkflowScript) -> DBWorkflowScript:
229
    return await db_workflow_script.insert()
230
231
232
@call_sync
233
async def sync_db_create_workflow_script(db_workflow_script: DBWorkflowScript) -> DBWorkflowScript:
234
    return await db_create_workflow_script(db_workflow_script=db_workflow_script)
235
236
237
async def db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
238
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.workflow_id == workflow_id)
239
    if not workflow:
240
        raise ValueError(f'Workflow-script with id "{workflow_id}" not in the DB.')
241
    return workflow
242
243
244
@call_sync
245
async def sync_db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
246
    return await db_get_workflow_script(workflow_id)
247
248
249
async def db_find_first_workflow_script_by_content(content_hash: str) -> DBWorkflowScript:
250
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.content_hash == content_hash)
251
    if not workflow:
252
        raise ValueError(f'Workflow-script with content_hash "{content_hash}" not in the DB.')
253
    return workflow
254
255
256
# TODO: Resolve the inconsistency between the async and sync versions of the same method
257
@call_sync
258
async def sync_db_find_first_workflow_script_by_content(workflow_id: str) -> DBWorkflowScript:
259
    return await db_get_workflow_script(workflow_id)
260