ocrd_network.database.sync_initiate_database()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with an uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.constants.JobState`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from beanie.operators import In
17
from pathlib import Path
18
from pymongo import AsyncMongoClient, MongoClient, uri_parser as mongo_uri_parser
19
from re import sub as re_sub
20
from typing import List
21
from uuid import uuid4
22
23
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript
24
from .utils import call_sync
25
26
27
async def initiate_database(db_url: str, db_name: str = 'ocrd'):
28
    client = AsyncMongoClient(db_url)
29
    await init_beanie(
30
        database=client.get_default_database(default=db_name),
31
        document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript]
32
    )
33
34
35
@call_sync
36
async def sync_initiate_database(db_url: str, db_name: str = 'ocrd'):
37
    await initiate_database(db_url, db_name)
38
39
40
async def db_create_workspace(mets_path: str) -> DBWorkspace:
41
    """ Create a workspace-database entry only from a mets-path
42
    """
43
    if not Path(mets_path).exists():
44
        raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
45
    try:
46
        return await db_get_workspace(workspace_mets_path=mets_path)
47
    except ValueError:
48
        workspace_db = DBWorkspace(
49
            workspace_id=str(uuid4()),
50
            workspace_path=Path(mets_path).parent,
51
            workspace_mets_path=mets_path,
52
            ocrd_identifier="",
53
            bagit_profile_identifier="",
54
        )
55
        await workspace_db.save()
56
        return workspace_db
57
58
59
@call_sync
60
async def sync_db_create_workspace(mets_path: str) -> DBWorkspace:
61
    return await db_create_workspace(mets_path=mets_path)
62
63
64
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
65
    workspace = None
66
    if not workspace_id and not workspace_mets_path:
67
        raise ValueError('Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
68
    if workspace_id:
69
        workspace = await DBWorkspace.find_one(
70
            DBWorkspace.workspace_id == workspace_id
71
        )
72
        if not workspace:
73
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
74
    if workspace_mets_path:
75
        workspace = await DBWorkspace.find_one(
76
            DBWorkspace.workspace_mets_path == workspace_mets_path
77
        )
78
        if not workspace:
79
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
80
    return workspace
81
82
83
@call_sync
84
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
85
    return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)
86
87
88
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
89
    workspace = None
90
    if not workspace_id and not workspace_mets_path:
91
        raise ValueError('Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
92
    if workspace_id:
93
        workspace = await DBWorkspace.find_one(DBWorkspace.workspace_id == workspace_id)
94
        if not workspace:
95
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
96
    if workspace_mets_path:
97
        workspace = await DBWorkspace.find_one(DBWorkspace.workspace_mets_path == workspace_mets_path)
98
        if not workspace:
99
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
100
101
    job_keys = list(workspace.__dict__.keys())
102
    for key, value in kwargs.items():
103
        if key not in job_keys:
104
            raise ValueError(f'Field "{key}" is not available.')
105
        if key == 'workspace_id':
106
            workspace.workspace_id = value
107
        elif key == 'workspace_mets_path':
108
            workspace.workspace_mets_path = value
109
        elif key == 'ocrd_identifier':
110
            workspace.ocrd_identifier = value
111
        elif key == 'bagit_profile_identifier':
112
            workspace.bagit_profile_identifier = value
113
        elif key == 'ocrd_base_version_checksum':
114
            workspace.ocrd_base_version_checksum = value
115
        elif key == 'ocrd_mets':
116
            workspace.ocrd_mets = value
117
        elif key == 'bag_info_adds':
118
            workspace.bag_info_adds = value
119
        elif key == 'deleted':
120
            workspace.deleted = value
121
        elif key == 'mets_server_url':
122
            workspace.mets_server_url = value
123
        else:
124
            raise ValueError(f'Field "{key}" is not updatable.')
125
    await workspace.save()
126
    return workspace
127
128
129
@call_sync
130
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
131
    return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
132
133
134
async def db_create_processing_job(db_processing_job: DBProcessorJob) -> DBProcessorJob:
135
    return await db_processing_job.insert()
136
137
138
@call_sync
139
async def sync_db_create_processing_job(db_processing_job: DBProcessorJob) -> DBProcessorJob:
140
    return await db_create_processing_job(db_processing_job=db_processing_job)
141
142
143
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
144
    job = await DBProcessorJob.find_one(
145
        DBProcessorJob.job_id == job_id)
146
    if not job:
147
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
148
    return job
149
150
151
@call_sync
152
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
153
    return await db_get_processing_job(job_id)
154
155
156
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
157
    job = await DBProcessorJob.find_one(
158
        DBProcessorJob.job_id == job_id)
159
    if not job:
160
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
161
162
    job_keys = list(job.__dict__.keys())
163
    for key, value in kwargs.items():
164
        if key not in job_keys:
165
            raise ValueError(f'Field "{key}" is not available.')
166
        if key == 'state':
167
            job.state = value
168
        elif key == 'start_time':
169
            job.start_time = value
170
        elif key == 'end_time':
171
            job.end_time = value
172
        elif key == 'path_to_mets':
173
            job.path_to_mets = value
174
        elif key == 'exec_time':
175
            job.exec_time = value
176
        elif key == 'log_file_path':
177
            job.log_file_path = value
178
        else:
179
            raise ValueError(f'Field "{key}" is not updatable.')
180
    await job.save()
181
    return job
182
183
184
@call_sync
185
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
186
    return await db_update_processing_job(job_id=job_id, **kwargs)
187
188
189
async def db_create_workflow_job(db_workflow_job: DBWorkflowJob) -> DBWorkflowJob:
190
    return await db_workflow_job.insert()
191
192
193
@call_sync
194
async def sync_db_create_workflow_job(db_workflow_job: DBWorkflowJob) -> DBWorkflowJob:
195
    return await db_create_workflow_job(db_workflow_job=db_workflow_job)
196
197
198
async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
199
    job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
200
    if not job:
201
        raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
202
    return job
203
204
205
@call_sync
206
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
207
    return await db_get_workflow_job(job_id)
208
209
210
async def db_get_processing_jobs(job_ids: List[str]) -> List[DBProcessorJob]:
211
    jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
212
    return jobs
213
214
215
@call_sync
216
async def sync_db_get_processing_jobs(job_ids: List[str]) -> List[DBProcessorJob]:
217
    return await db_get_processing_jobs(job_ids)
218
219
220
async def db_create_workflow_script(db_workflow_script: DBWorkflowScript) -> DBWorkflowScript:
221
    return await db_workflow_script.insert()
222
223
224
@call_sync
225
async def sync_db_create_workflow_script(db_workflow_script: DBWorkflowScript) -> DBWorkflowScript:
226
    return await db_create_workflow_script(db_workflow_script=db_workflow_script)
227
228
229
async def db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
230
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.workflow_id == workflow_id)
231
    if not workflow:
232
        raise ValueError(f'Workflow-script with id "{workflow_id}" not in the DB.')
233
    return workflow
234
235
236
@call_sync
237
async def sync_db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
238
    return await db_get_workflow_script(workflow_id)
239
240
241
async def db_find_first_workflow_script_by_content(content_hash: str) -> DBWorkflowScript:
242
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.content_hash == content_hash)
243
    if not workflow:
244
        raise ValueError(f'Workflow-script with content_hash "{content_hash}" not in the DB.')
245
    return workflow
246
247
248
# TODO: Resolve the inconsistency between the async and sync versions of the same method
249
@call_sync
250
async def sync_db_find_first_workflow_script_by_content(workflow_id: str) -> DBWorkflowScript:
251
    return await db_get_workflow_script(workflow_id)
252
253
254
def verify_database_uri(mongodb_address: str) -> str:
255
    try:
256
        # perform validation check
257
        mongo_uri_parser.parse_uri(uri=mongodb_address, validate=True)
258
    except Exception as error:
259
        raise ValueError(f"The MongoDB address '{mongodb_address}' is in wrong format, {error}")
260
    return mongodb_address
261
262
263
def verify_mongodb_available(mongo_url: str) -> None:
264
    """
265
    # The protocol is intentionally set to HTTP instead of MONGODB!
266
    mongodb_test_url = mongo_url.replace("mongodb", "http")
267
    if is_url_responsive(url=mongodb_test_url, tries=3):
268
        return
269
    raise RuntimeError(f"Verifying connection has failed: {mongodb_test_url}")
270
    """
271
272
    try:
273
        client = MongoClient(mongo_url, serverSelectionTimeoutMS=60000.0)
274
        client.admin.command("ismaster")
275
    except Exception:
276
        raise RuntimeError(f'Cannot connect to MongoDB: {re_sub(r":[^@]+@", ":****@", mongo_url)}')
277