Passed
Pull Request — master (#1083)
by
unknown
02:42
created

ocrd_network.database   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 177
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 46
eloc 128
dl 0
loc 177
rs 8.72
c 0
b 0
f 0

11 Functions

Rating   Name   Duplication   Size   Complexity  
A initiate_database() 0 5 1
A sync_initiate_database() 0 3 1
A db_create_workspace() 0 17 3
B db_get_workspace() 0 17 7
A sync_db_get_workspace() 0 3 1
A sync_db_update_processing_job() 0 3 1
F db_update_workspace() 0 45 19
C db_update_processing_job() 0 24 9
A db_get_processing_job() 0 6 2
A sync_db_get_processing_job() 0 3 1
A sync_db_update_workspace() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.database often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with an uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from motor.motor_asyncio import AsyncIOMotorClient
17
from uuid import uuid4
18
from pathlib import Path
19
20
from .models import (
21
    DBProcessorJob,
22
    DBWorkspace
23
)
24
from .utils import call_sync
25
26
27
async def initiate_database(db_url: str):
28
    client = AsyncIOMotorClient(db_url)
29
    await init_beanie(
30
        database=client.get_default_database(default='ocrd'),
31
        document_models=[DBProcessorJob, DBWorkspace]
32
    )
33
34
35
@call_sync
36
async def sync_initiate_database(db_url: str):
37
    await initiate_database(db_url)
38
39
40
async def db_create_workspace(mets_path: str) -> DBWorkspace:
41
    """ Create a workspace-database entry only from a mets-path
42
    """
43
    if not Path(mets_path).exists():
44
        raise ValueError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
45
    try:
46
        return await db_get_workspace(workspace_mets_path=mets_path)
47
    except ValueError:
48
        workspace_db = DBWorkspace(
49
            workspace_id=str(uuid4()),
50
            workspace_path=Path(mets_path).parent,
51
            workspace_mets_path=mets_path,
52
            ocrd_identifier="",
53
            bagit_profile_identifier="",
54
        )
55
        await workspace_db.save()
56
        return workspace_db
57
58
59
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
60
    workspace = None
61
    if not workspace_id and not workspace_mets_path:
62
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
63
    if workspace_id:
64
        workspace = await DBWorkspace.find_one(
65
            DBWorkspace.workspace_id == workspace_id
66
        )
67
        if not workspace:
68
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
69
    if workspace_mets_path:
70
        workspace = await DBWorkspace.find_one(
71
            DBWorkspace.workspace_mets_path == workspace_mets_path
72
        )
73
        if not workspace:
74
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
75
    return workspace
76
77
78
@call_sync
79
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
80
    return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)
81
82
83
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
84
    workspace = None
85
    if not workspace_id and not workspace_mets_path:
86
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
87
    if workspace_id:
88
        workspace = await DBWorkspace.find_one(
89
            DBWorkspace.workspace_id == workspace_id
90
        )
91
        if not workspace:
92
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
93
    if workspace_mets_path:
94
        workspace = await DBWorkspace.find_one(
95
            DBWorkspace.workspace_mets_path == workspace_mets_path
96
        )
97
        if not workspace:
98
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
99
100
    job_keys = list(workspace.__dict__.keys())
101
    for key, value in kwargs.items():
102
        if key not in job_keys:
103
            raise ValueError(f'Field "{key}" is not available.')
104
        if key == 'workspace_id':
105
            workspace.workspace_id = value
106
        elif key == 'workspace_mets_path':
107
            workspace.workspace_mets_path = value
108
        elif key == 'ocrd_identifier':
109
            workspace.ocrd_identifier = value
110
        elif key == 'bagit_profile_identifier':
111
            workspace.bagit_profile_identifier = value
112
        elif key == 'ocrd_base_version_checksum':
113
            workspace.ocrd_base_version_checksum = value
114
        elif key == 'ocrd_mets':
115
            workspace.ocrd_mets = value
116
        elif key == 'bag_info_adds':
117
            workspace.bag_info_adds = value
118
        elif key == 'deleted':
119
            workspace.deleted = value
120
        elif key == 'pages_locked':
121
            workspace.pages_locked = value
122
        elif key == 'mets_server_url':
123
            workspace.mets_server_url = value
124
        else:
125
            raise ValueError(f'Field "{key}" is not updatable.')
126
    await workspace.save()
127
    return workspace
128
129
130
@call_sync
131
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
132
    return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
133
134
135
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
136
    job = await DBProcessorJob.find_one(
137
        DBProcessorJob.job_id == job_id)
138
    if not job:
139
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
140
    return job
141
142
143
@call_sync
144
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
145
    return await db_get_processing_job(job_id)
146
147
148
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
149
    job = await DBProcessorJob.find_one(
150
        DBProcessorJob.job_id == job_id)
151
    if not job:
152
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
153
154
    job_keys = list(job.__dict__.keys())
155
    for key, value in kwargs.items():
156
        if key not in job_keys:
157
            raise ValueError(f'Field "{key}" is not available.')
158
        if key == 'state':
159
            job.state = value
160
        elif key == 'start_time':
161
            job.start_time = value
162
        elif key == 'end_time':
163
            job.end_time = value
164
        elif key == 'path_to_mets':
165
            job.path_to_mets = value
166
        elif key == 'exec_time':
167
            job.exec_time = value
168
        else:
169
            raise ValueError(f'Field "{key}" is not updatable.')
170
    await job.save()
171
    return job
172
173
174
@call_sync
175
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
176
    return await db_update_processing_job(job_id=job_id, **kwargs)
177