Passed
Pull Request — master (#974)
by Konstantin
02:57
created

ocrd_network.database   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 95
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 18
eloc 58
dl 0
loc 95
rs 10
c 0
b 0
f 0

8 Functions

Rating   Name   Duplication   Size   Complexity  
A initiate_database() 0 5 1
A sync_db_update_processing_job() 0 3 1
A sync_initiate_database() 0 3 1
C db_update_processing_job() 0 25 9
A db_get_workspace() 0 7 2
A db_get_processing_job() 0 6 2
A sync_db_get_workspace() 0 3 1
A sync_db_get_processing_job() 0 3 1
1
""" The database is used to store information regarding jobs and workspaces.
2
3
Jobs: for every process-request a job is inserted into the database with a uuid, status and
4
information about the process like parameters and file groups. It is mainly used to track the status
5
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
6
jobs are not deleted from the database.
7
8
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information
9
where the workspace is available. This information can be set with providing an absolute path or a
10
workspace_id. With the latter, the database is used to convert the workspace_id to a path.
11
12
XXX: Currently the information is not preserved after the processing-server shuts down as the
13
database (runs in docker) currently has no volume set.
14
"""
15
from beanie import init_beanie
16
from motor.motor_asyncio import AsyncIOMotorClient
17
18
from .models import (
19
    DBProcessorJob,
20
    DBWorkspace
21
)
22
from .utils import call_sync
23
24
25
async def initiate_database(db_url: str):
26
    client = AsyncIOMotorClient(db_url)
27
    await init_beanie(
28
        database=client.get_default_database(default='ocrd'),
29
        document_models=[DBProcessorJob, DBWorkspace]
30
    )
31
32
33
@call_sync
34
async def sync_initiate_database(db_url: str):
35
    await initiate_database(db_url)
36
37
38
async def db_get_workspace(workspace_id: str) -> DBWorkspace:
39
    workspace = await DBWorkspace.find_one(
40
        DBWorkspace.workspace_id == workspace_id
41
    )
42
    if not workspace:
43
        raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
44
    return workspace
45
46
47
@call_sync
48
async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace:
49
    return await db_get_workspace(workspace_id)
50
51
52
async def db_get_processing_job(job_id: str) -> DBProcessorJob:
53
    job = await DBProcessorJob.find_one(
54
        DBProcessorJob.job_id == job_id)
55
    if not job:
56
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
57
    return job
58
59
60
@call_sync
61
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
62
    return await db_get_processing_job(job_id)
63
64
65
async def db_update_processing_job(job_id: str, **kwargs):
66
    job = await DBProcessorJob.find_one(
67
        DBProcessorJob.job_id == job_id)
68
    if not job:
69
        raise ValueError(f'Processing job with id "{job_id}" not in the DB.')
70
71
    # TODO: This may not be the best Pythonic way to do it. However, it works!
72
    #  There must be a shorter way with Pydantic. Suggest an improvement.
73
    job_keys = list(job.__dict__.keys())
74
    for key, value in kwargs.items():
75
        if key not in job_keys:
76
            raise ValueError(f'Field "{key}" is not available.')
77
        if key == 'state':
78
            job.state = value
79
        elif key == 'start_time':
80
            job.start_time = value
81
        elif key == 'end_time':
82
            job.end_time = value
83
        elif key == 'path_to_mets':
84
            job.path_to_mets = value
85
        elif key == 'exec_time':
86
            job.exec_time = value
87
        else:
88
            raise ValueError(f'Field "{key}" is not updatable.')
89
    await job.save()
90
91
92
@call_sync
93
async def sync_db_update_processing_job(job_id: str, **kwargs):
94
    await db_update_processing_job(job_id=job_id, **kwargs)
95