Passed
Push — master ( 35fa77...ea583d )
by Konstantin
05:50
created

ocrd_network.database.initiate_database()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with an uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from beanie.operators import In
17
from motor.motor_asyncio import AsyncIOMotorClient
18
from uuid import uuid4
19
from pathlib import Path
20
from typing import List
21
22
from .models import (
23
    DBProcessorJob,
24
    DBWorkflowJob,
25
    DBWorkspace,
26
    DBWorkflowScript,
27
)
28
from .utils import call_sync
29
30
31
async def initiate_database(db_url: str):
32
    client = AsyncIOMotorClient(db_url)
33
    await init_beanie(
34
        database=client.get_default_database(default='ocrd'),
35
        document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript]
36
    )
37
38
39
@call_sync
40
async def sync_initiate_database(db_url: str):
41
    await initiate_database(db_url)
42
43
44
async def db_create_workspace(mets_path: str) -> DBWorkspace:
45
    """ Create a workspace-database entry only from a mets-path
46
    """
47
    if not Path(mets_path).exists():
48
        raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
49
    try:
50
        return await db_get_workspace(workspace_mets_path=mets_path)
51
    except ValueError:
52
        workspace_db = DBWorkspace(
53
            workspace_id=str(uuid4()),
54
            workspace_path=Path(mets_path).parent,
55
            workspace_mets_path=mets_path,
56
            ocrd_identifier="",
57
            bagit_profile_identifier="",
58
        )
59
        await workspace_db.save()
60
        return workspace_db
61
62
63
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
64
    workspace = None
65
    if not workspace_id and not workspace_mets_path:
66
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
67
    if workspace_id:
68
        workspace = await DBWorkspace.find_one(
69
            DBWorkspace.workspace_id == workspace_id
70
        )
71
        if not workspace:
72
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
73
    if workspace_mets_path:
74
        workspace = await DBWorkspace.find_one(
75
            DBWorkspace.workspace_mets_path == workspace_mets_path
76
        )
77
        if not workspace:
78
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
79
    return workspace
80
81
82
@call_sync
83
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
84
    return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)
85
86
87
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
88
    workspace = None
89
    if not workspace_id and not workspace_mets_path:
90
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
91
    if workspace_id:
92
        workspace = await DBWorkspace.find_one(
93
            DBWorkspace.workspace_id == workspace_id
94
        )
95
        if not workspace:
96
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
97
    if workspace_mets_path:
98
        workspace = await DBWorkspace.find_one(
99
            DBWorkspace.workspace_mets_path == workspace_mets_path
100
        )
101
        if not workspace:
102
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
103
104
    job_keys = list(workspace.__dict__.keys())
105
    for key, value in kwargs.items():
106
        if key not in job_keys:
107
            raise ValueError(f'Field "{key}" is not available.')
108
        if key == 'workspace_id':
109
            workspace.workspace_id = value
110
        elif key == 'workspace_mets_path':
111
            workspace.workspace_mets_path = value
112
        elif key == 'ocrd_identifier':
113
            workspace.ocrd_identifier = value
114
        elif key == 'bagit_profile_identifier':
115
            workspace.bagit_profile_identifier = value
116
        elif key == 'ocrd_base_version_checksum':
117
            workspace.ocrd_base_version_checksum = value
118
        elif key == 'ocrd_mets':
119
            workspace.ocrd_mets = value
120
        elif key == 'bag_info_adds':
121
            workspace.bag_info_adds = value
122
        elif key == 'deleted':
123
            workspace.deleted = value
124
        elif key == 'mets_server_url':
125
            workspace.mets_server_url = value
126
        else:
127
            raise ValueError(f'Field "{key}" is not updatable.')
128
    await workspace.save()
129
    return workspace
130
131
132
@call_sync
133
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
134
    return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
135
136
137
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
138
    job = await DBProcessorJob.find_one(
139
        DBProcessorJob.job_id == job_id)
140
    if not job:
141
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
142
    return job
143
144
145
@call_sync
146
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
147
    return await db_get_processing_job(job_id)
148
149
150
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
151
    job = await DBProcessorJob.find_one(
152
        DBProcessorJob.job_id == job_id)
153
    if not job:
154
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
155
156
    job_keys = list(job.__dict__.keys())
157
    for key, value in kwargs.items():
158
        if key not in job_keys:
159
            raise ValueError(f'Field "{key}" is not available.')
160
        if key == 'state':
161
            job.state = value
162
        elif key == 'start_time':
163
            job.start_time = value
164
        elif key == 'end_time':
165
            job.end_time = value
166
        elif key == 'path_to_mets':
167
            job.path_to_mets = value
168
        elif key == 'exec_time':
169
            job.exec_time = value
170
        elif key == 'log_file_path':
171
            job.log_file_path = value
172
        else:
173
            raise ValueError(f'Field "{key}" is not updatable.')
174
    await job.save()
175
    return job
176
177
178
@call_sync
179
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
180
    return await db_update_processing_job(job_id=job_id, **kwargs)
181
182
183
async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
184
    job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
185
    if not job:
186
        raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
187
    return job
188
189
190
@call_sync
191
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
192
    return await db_get_workflow_job(job_id)
193
194
195
async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
196
    jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
197
    return jobs
198
199
200
@call_sync
201
async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
202
    return await db_get_processing_jobs(job_ids)
203
204
205
async def db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
206
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.workflow_id == workflow_id)
207
    if not workflow:
208
        raise ValueError(f'Workflow-script with id "{workflow_id}" not in the DB.')
209
    return workflow
210
211
212
@call_sync
213
async def sync_db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
214
    return await db_get_workflow_script(workflow_id)
215
216
217
async def db_find_first_workflow_script_by_content(content_hash: str) -> DBWorkflowScript:
218
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.content_hash == content_hash)
219
    if not workflow:
220
        raise ValueError(f'Workflow-script with content_hash "{content_hash}" not in the DB.')
221
    return workflow
222
223
224
@call_sync
225
async def sync_db_find_first_workflow_script_by_content(workflow_id: str) -> DBWorkflowScript:
226
    return await db_get_workflow_script(workflow_id)
227