ocrd_network.database.verify_database_uri()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with an uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.constants.JobState`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from beanie.operators import In
17
from motor.motor_asyncio import AsyncIOMotorClient
18
from pathlib import Path
19
from pymongo import MongoClient, uri_parser as mongo_uri_parser
20
from re import sub as re_sub
21
from typing import List
22
from uuid import uuid4
23
24
from .models import DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript
25
from .utils import call_sync
26
27
28
async def initiate_database(db_url: str, db_name: str = 'ocrd'):
29
    client = AsyncIOMotorClient(db_url)
30
    await init_beanie(
31
        database=client.get_default_database(default=db_name),
32
        document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript]
33
    )
34
35
36
@call_sync
37
async def sync_initiate_database(db_url: str, db_name: str = 'ocrd'):
38
    await initiate_database(db_url, db_name)
39
40
41
async def db_create_workspace(mets_path: str) -> DBWorkspace:
42
    """ Create a workspace-database entry only from a mets-path
43
    """
44
    if not Path(mets_path).exists():
45
        raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
46
    try:
47
        return await db_get_workspace(workspace_mets_path=mets_path)
48
    except ValueError:
49
        workspace_db = DBWorkspace(
50
            workspace_id=str(uuid4()),
51
            workspace_path=Path(mets_path).parent,
52
            workspace_mets_path=mets_path,
53
            ocrd_identifier="",
54
            bagit_profile_identifier="",
55
        )
56
        await workspace_db.save()
57
        return workspace_db
58
59
60
@call_sync
61
async def sync_db_create_workspace(mets_path: str) -> DBWorkspace:
62
    return await db_create_workspace(mets_path=mets_path)
63
64
65
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
66
    workspace = None
67
    if not workspace_id and not workspace_mets_path:
68
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
69
    if workspace_id:
70
        workspace = await DBWorkspace.find_one(
71
            DBWorkspace.workspace_id == workspace_id
72
        )
73
        if not workspace:
74
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
75
    if workspace_mets_path:
76
        workspace = await DBWorkspace.find_one(
77
            DBWorkspace.workspace_mets_path == workspace_mets_path
78
        )
79
        if not workspace:
80
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
81
    return workspace
82
83
84
@call_sync
85
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
86
    return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)
87
88
89
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
90
    workspace = None
91
    if not workspace_id and not workspace_mets_path:
92
        raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
93
    if workspace_id:
94
        workspace = await DBWorkspace.find_one(DBWorkspace.workspace_id == workspace_id)
95
        if not workspace:
96
            raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
97
    if workspace_mets_path:
98
        workspace = await DBWorkspace.find_one(DBWorkspace.workspace_mets_path == workspace_mets_path)
99
        if not workspace:
100
            raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
101
102
    job_keys = list(workspace.__dict__.keys())
103
    for key, value in kwargs.items():
104
        if key not in job_keys:
105
            raise ValueError(f'Field "{key}" is not available.')
106
        if key == 'workspace_id':
107
            workspace.workspace_id = value
108
        elif key == 'workspace_mets_path':
109
            workspace.workspace_mets_path = value
110
        elif key == 'ocrd_identifier':
111
            workspace.ocrd_identifier = value
112
        elif key == 'bagit_profile_identifier':
113
            workspace.bagit_profile_identifier = value
114
        elif key == 'ocrd_base_version_checksum':
115
            workspace.ocrd_base_version_checksum = value
116
        elif key == 'ocrd_mets':
117
            workspace.ocrd_mets = value
118
        elif key == 'bag_info_adds':
119
            workspace.bag_info_adds = value
120
        elif key == 'deleted':
121
            workspace.deleted = value
122
        elif key == 'mets_server_url':
123
            workspace.mets_server_url = value
124
        else:
125
            raise ValueError(f'Field "{key}" is not updatable.')
126
    await workspace.save()
127
    return workspace
128
129
130
@call_sync
131
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
132
    return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
133
134
135
async def db_create_processing_job(db_processing_job: DBProcessorJob) -> DBProcessorJob:
136
    return await db_processing_job.insert()
137
138
139
@call_sync
140
async def sync_db_create_processing_job(db_processing_job: DBProcessorJob) -> DBProcessorJob:
141
    return await db_create_processing_job(db_processing_job=db_processing_job)
142
143
144
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
145
    job = await DBProcessorJob.find_one(
146
        DBProcessorJob.job_id == job_id)
147
    if not job:
148
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
149
    return job
150
151
152
@call_sync
153
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
154
    return await db_get_processing_job(job_id)
155
156
157
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
158
    job = await DBProcessorJob.find_one(
159
        DBProcessorJob.job_id == job_id)
160
    if not job:
161
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
162
163
    job_keys = list(job.__dict__.keys())
164
    for key, value in kwargs.items():
165
        if key not in job_keys:
166
            raise ValueError(f'Field "{key}" is not available.')
167
        if key == 'state':
168
            job.state = value
169
        elif key == 'start_time':
170
            job.start_time = value
171
        elif key == 'end_time':
172
            job.end_time = value
173
        elif key == 'path_to_mets':
174
            job.path_to_mets = value
175
        elif key == 'exec_time':
176
            job.exec_time = value
177
        elif key == 'log_file_path':
178
            job.log_file_path = value
179
        else:
180
            raise ValueError(f'Field "{key}" is not updatable.')
181
    await job.save()
182
    return job
183
184
185
@call_sync
186
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
187
    return await db_update_processing_job(job_id=job_id, **kwargs)
188
189
190
async def db_create_workflow_job(db_workflow_job: DBWorkflowJob) -> DBWorkflowJob:
191
    return await db_workflow_job.insert()
192
193
194
@call_sync
195
async def sync_db_create_workflow_job(db_workflow_job: DBWorkflowJob) -> DBWorkflowJob:
196
    return await db_create_workflow_job(db_workflow_job=db_workflow_job)
197
198
199
async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
200
    job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
201
    if not job:
202
        raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
203
    return job
204
205
206
@call_sync
207
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
208
    return await db_get_workflow_job(job_id)
209
210
211
async def db_get_processing_jobs(job_ids: List[str]) -> List[DBProcessorJob]:
212
    jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
213
    return jobs
214
215
216
@call_sync
217
async def sync_db_get_processing_jobs(job_ids: List[str]) -> List[DBProcessorJob]:
218
    return await db_get_processing_jobs(job_ids)
219
220
221
async def db_create_workflow_script(db_workflow_script: DBWorkflowScript) -> DBWorkflowScript:
222
    return await db_workflow_script.insert()
223
224
225
@call_sync
226
async def sync_db_create_workflow_script(db_workflow_script: DBWorkflowScript) -> DBWorkflowScript:
227
    return await db_create_workflow_script(db_workflow_script=db_workflow_script)
228
229
230
async def db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
231
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.workflow_id == workflow_id)
232
    if not workflow:
233
        raise ValueError(f'Workflow-script with id "{workflow_id}" not in the DB.')
234
    return workflow
235
236
237
@call_sync
238
async def sync_db_get_workflow_script(workflow_id: str) -> DBWorkflowScript:
239
    return await db_get_workflow_script(workflow_id)
240
241
242
async def db_find_first_workflow_script_by_content(content_hash: str) -> DBWorkflowScript:
243
    workflow = await DBWorkflowScript.find_one(DBWorkflowScript.content_hash == content_hash)
244
    if not workflow:
245
        raise ValueError(f'Workflow-script with content_hash "{content_hash}" not in the DB.')
246
    return workflow
247
248
249
# TODO: Resolve the inconsistency between the async and sync versions of the same method
250
@call_sync
251
async def sync_db_find_first_workflow_script_by_content(workflow_id: str) -> DBWorkflowScript:
252
    return await db_get_workflow_script(workflow_id)
253
254
255
def verify_database_uri(mongodb_address: str) -> str:
256
    try:
257
        # perform validation check
258
        mongo_uri_parser.parse_uri(uri=mongodb_address, validate=True)
259
    except Exception as error:
260
        raise ValueError(f"The MongoDB address '{mongodb_address}' is in wrong format, {error}")
261
    return mongodb_address
262
263
264
def verify_mongodb_available(mongo_url: str) -> None:
265
    """
266
    # The protocol is intentionally set to HTTP instead of MONGODB!
267
    mongodb_test_url = mongo_url.replace("mongodb", "http")
268
    if is_url_responsive(url=mongodb_test_url, tries=3):
269
        return
270
    raise RuntimeError(f"Verifying connection has failed: {mongodb_test_url}")
271
    """
272
273
    try:
274
        client = MongoClient(mongo_url, serverSelectionTimeoutMS=60000.0)
275
        client.admin.command("ismaster")
276
    except Exception:
277
        raise RuntimeError(f'Cannot connect to MongoDB: {re_sub(r":[^@]+@", ":****@", mongo_url)}')