|
1
|
|
|
""" The database is used to store information regarding jobs and workspaces. |
|
2
|
|
|
|
|
3
|
|
|
Jobs: for every process-request a job is inserted into the database with a uuid, status and |
|
4
|
|
|
information about the process like parameters and file groups. It is mainly used to track the status |
|
5
|
|
|
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished |
|
6
|
|
|
jobs are not deleted from the database. |
|
7
|
|
|
|
|
8
|
|
|
Workspaces: A job or a processor always runs on a workspace. So a processor needs the information |
|
9
|
|
|
where the workspace is available. This information can be set with providing an absolute path or a |
|
10
|
|
|
workspace_id. With the latter, the database is used to convert the workspace_id to a path. |
|
11
|
|
|
|
|
12
|
|
|
XXX: Currently the information is not preserved after the processing-server shuts down as the |
|
13
|
|
|
database (runs in docker) currently has no volume set. |
|
14
|
|
|
""" |
|
15
|
|
|
from beanie import init_beanie |
|
16
|
|
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
17
|
|
|
|
|
18
|
|
|
from .models import ( |
|
19
|
|
|
DBProcessorJob, |
|
20
|
|
|
DBWorkspace |
|
21
|
|
|
) |
|
22
|
|
|
from .utils import call_sync |
|
23
|
|
|
|
|
24
|
|
|
|
|
25
|
|
|
async def initiate_database(db_url: str): |
|
26
|
|
|
client = AsyncIOMotorClient(db_url) |
|
27
|
|
|
await init_beanie( |
|
28
|
|
|
database=client.get_default_database(default='ocrd'), |
|
29
|
|
|
document_models=[DBProcessorJob, DBWorkspace] |
|
30
|
|
|
) |
|
31
|
|
|
|
|
32
|
|
|
|
|
33
|
|
|
@call_sync |
|
34
|
|
|
async def sync_initiate_database(db_url: str): |
|
35
|
|
|
await initiate_database(db_url) |
|
36
|
|
|
|
|
37
|
|
|
|
|
38
|
|
|
async def db_get_workspace(workspace_id: str) -> DBWorkspace: |
|
39
|
|
|
workspace = await DBWorkspace.find_one( |
|
40
|
|
|
DBWorkspace.workspace_id == workspace_id |
|
41
|
|
|
) |
|
42
|
|
|
if not workspace: |
|
43
|
|
|
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.') |
|
44
|
|
|
return workspace |
|
45
|
|
|
|
|
46
|
|
|
|
|
47
|
|
|
@call_sync |
|
48
|
|
|
async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace: |
|
49
|
|
|
return await db_get_workspace(workspace_id) |
|
50
|
|
|
|
|
51
|
|
|
|
|
52
|
|
|
async def db_get_processing_job(job_id: str) -> DBProcessorJob: |
|
53
|
|
|
job = await DBProcessorJob.find_one( |
|
54
|
|
|
DBProcessorJob.job_id == job_id) |
|
55
|
|
|
if not job: |
|
56
|
|
|
raise ValueError(f'Processing job with id "{job_id}" not in the DB.') |
|
57
|
|
|
return job |
|
58
|
|
|
|
|
59
|
|
|
|
|
60
|
|
|
@call_sync |
|
61
|
|
|
async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob: |
|
62
|
|
|
return await db_get_processing_job(job_id) |
|
63
|
|
|
|
|
64
|
|
|
|
|
65
|
|
|
async def db_update_processing_job(job_id: str, **kwargs): |
|
66
|
|
|
job = await DBProcessorJob.find_one( |
|
67
|
|
|
DBProcessorJob.job_id == job_id) |
|
68
|
|
|
if not job: |
|
69
|
|
|
raise ValueError(f'Processing job with id "{job_id}" not in the DB.') |
|
70
|
|
|
|
|
71
|
|
|
# TODO: This may not be the best Pythonic way to do it. However, it works! |
|
72
|
|
|
# There must be a shorter way with Pydantic. Suggest an improvement. |
|
73
|
|
|
job_keys = list(job.__dict__.keys()) |
|
74
|
|
|
for key, value in kwargs.items(): |
|
75
|
|
|
if key not in job_keys: |
|
76
|
|
|
raise ValueError(f'Field "{key}" is not available.') |
|
77
|
|
|
if key == 'state': |
|
78
|
|
|
job.state = value |
|
79
|
|
|
elif key == 'start_time': |
|
80
|
|
|
job.start_time = value |
|
81
|
|
|
elif key == 'end_time': |
|
82
|
|
|
job.end_time = value |
|
83
|
|
|
elif key == 'path_to_mets': |
|
84
|
|
|
job.path_to_mets = value |
|
85
|
|
|
elif key == 'exec_time': |
|
86
|
|
|
job.exec_time = value |
|
87
|
|
|
else: |
|
88
|
|
|
raise ValueError(f'Field "{key}" is not updatable.') |
|
89
|
|
|
await job.save() |
|
90
|
|
|
|
|
91
|
|
|
|
|
92
|
|
|
@call_sync |
|
93
|
|
|
async def sync_db_update_processing_job(job_id: str, **kwargs): |
|
94
|
|
|
await db_update_processing_job(job_id=job_id, **kwargs) |
|
95
|
|
|
|