|
1
|
|
|
from __future__ import annotations |
|
2
|
|
|
from typing import Dict, List |
|
3
|
|
|
from logging import DEBUG, getLogger, FileHandler |
|
4
|
|
|
|
|
5
|
|
|
from .database import db_get_processing_job, db_update_processing_job |
|
6
|
|
|
from .models import PYJobInput, StateEnum |
|
7
|
|
|
|
|
8
|
|
|
__all__ = [ |
|
9
|
|
|
'CacheLockedPages', |
|
10
|
|
|
'CacheProcessingRequests' |
|
11
|
|
|
] |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
class CacheLockedPages: |
|
15
|
|
|
def __init__(self) -> None: |
|
16
|
|
|
self.log = getLogger("ocrd_network.server_cache.locked_pages") |
|
17
|
|
|
# TODO: remove this when refactoring the logging |
|
18
|
|
|
self.log.setLevel(DEBUG) |
|
19
|
|
|
log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_locked_pages.log') |
|
20
|
|
|
log_fh.setLevel(DEBUG) |
|
21
|
|
|
self.log.addHandler(log_fh) |
|
22
|
|
|
|
|
23
|
|
|
# Used for keeping track of locked pages for a workspace |
|
24
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
|
25
|
|
|
# Value: A dictionary where each dictionary key is the output file group, |
|
26
|
|
|
# and the values are list of strings representing the locked pages |
|
27
|
|
|
self.locked_pages: Dict[str, Dict[str, List[str]]] = {} |
|
28
|
|
|
# Used as a placeholder to lock all pages when no page_id is specified |
|
29
|
|
|
self.placeholder_all_pages: str = "all_pages" |
|
30
|
|
|
|
|
31
|
|
|
def check_if_locked_pages_for_output_file_grps( |
|
32
|
|
|
self, |
|
33
|
|
|
workspace_key: str, |
|
34
|
|
|
output_file_grps: List[str], |
|
35
|
|
|
page_ids: List[str] |
|
36
|
|
|
) -> bool: |
|
37
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
38
|
|
|
self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
|
39
|
|
|
return False |
|
40
|
|
|
for output_fileGrp in output_file_grps: |
|
41
|
|
|
if output_fileGrp in self.locked_pages[workspace_key]: |
|
42
|
|
|
if self.placeholder_all_pages in self.locked_pages[workspace_key][output_fileGrp]: |
|
43
|
|
|
self.log.debug(f"Caching the received request due to locked output file grp pages") |
|
44
|
|
|
return True |
|
45
|
|
|
if not set(self.locked_pages[workspace_key][output_fileGrp]).isdisjoint(page_ids): |
|
46
|
|
|
self.log.debug(f"Caching the received request due to locked output file grp pages") |
|
47
|
|
|
return True |
|
48
|
|
|
return False |
|
49
|
|
|
|
|
50
|
|
|
def get_locked_pages( |
|
51
|
|
|
self, |
|
52
|
|
|
workspace_key: str |
|
53
|
|
|
) -> Dict[str, List[str]]: |
|
54
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
55
|
|
|
self.log.debug(f"No locked pages available for workspace key: {workspace_key}") |
|
56
|
|
|
return {} |
|
57
|
|
|
return self.locked_pages[workspace_key] |
|
58
|
|
|
|
|
59
|
|
|
def lock_pages( |
|
60
|
|
|
self, |
|
61
|
|
|
workspace_key: str, |
|
62
|
|
|
output_file_grps: List[str], |
|
63
|
|
|
page_ids: List[str] |
|
64
|
|
|
) -> None: |
|
65
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
66
|
|
|
self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
|
67
|
|
|
self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") |
|
68
|
|
|
self.locked_pages[workspace_key] = {} |
|
69
|
|
|
|
|
70
|
|
|
for output_fileGrp in output_file_grps: |
|
71
|
|
|
if output_fileGrp not in self.locked_pages[workspace_key]: |
|
72
|
|
|
self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") |
|
73
|
|
|
self.locked_pages[workspace_key][output_fileGrp] = [] |
|
74
|
|
|
# The page id list is not empty - only some pages are in the request |
|
75
|
|
|
if page_ids: |
|
76
|
|
|
self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") |
|
77
|
|
|
self.locked_pages[workspace_key][output_fileGrp].extend(page_ids) |
|
78
|
|
|
self.log.debug(f"Locked pages of `{output_fileGrp}`: " |
|
79
|
|
|
f"{self.locked_pages[workspace_key][output_fileGrp]}") |
|
80
|
|
|
else: |
|
81
|
|
|
# Lock all pages with a single value |
|
82
|
|
|
self.log.debug(f"Locking pages for `{output_fileGrp}`: {self.placeholder_all_pages}") |
|
83
|
|
|
self.locked_pages[workspace_key][output_fileGrp].append(self.placeholder_all_pages) |
|
84
|
|
|
|
|
85
|
|
|
def unlock_pages( |
|
86
|
|
|
self, |
|
87
|
|
|
workspace_key: str, |
|
88
|
|
|
output_file_grps: List[str], |
|
89
|
|
|
page_ids: List[str] |
|
90
|
|
|
) -> None: |
|
91
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
92
|
|
|
self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
|
93
|
|
|
return |
|
94
|
|
|
for output_fileGrp in output_file_grps: |
|
95
|
|
|
if output_fileGrp in self.locked_pages[workspace_key]: |
|
96
|
|
|
if page_ids: |
|
97
|
|
|
# Unlock the previously locked pages |
|
98
|
|
|
self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") |
|
99
|
|
|
self.locked_pages[workspace_key][output_fileGrp] = \ |
|
100
|
|
|
[x for x in self.locked_pages[workspace_key][output_fileGrp] if x not in page_ids] |
|
101
|
|
|
self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: " |
|
102
|
|
|
f"{self.locked_pages[workspace_key][output_fileGrp]}") |
|
103
|
|
|
else: |
|
104
|
|
|
# Remove the single variable used to indicate all pages are locked |
|
105
|
|
|
self.log.debug(f"Unlocking all pages for: {output_fileGrp}") |
|
106
|
|
|
self.locked_pages[workspace_key][output_fileGrp].remove(self.placeholder_all_pages) |
|
107
|
|
|
|
|
108
|
|
|
|
|
109
|
|
|
class CacheProcessingRequests: |
|
110
|
|
|
def __init__(self) -> None: |
|
111
|
|
|
self.log = getLogger("ocrd_network.server_cache.processing_requests") |
|
112
|
|
|
# TODO: remove this when refactoring the logging |
|
113
|
|
|
self.log.setLevel(DEBUG) |
|
114
|
|
|
log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_processing_requests.log') |
|
115
|
|
|
log_fh.setLevel(DEBUG) |
|
116
|
|
|
self.log.addHandler(log_fh) |
|
117
|
|
|
|
|
118
|
|
|
# Used for buffering/caching processing requests in the Processing Server |
|
119
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
|
120
|
|
|
# Value: Queue that holds PYInputJob elements |
|
121
|
|
|
self.processing_requests: Dict[str, List[PYJobInput]] = {} |
|
122
|
|
|
|
|
123
|
|
|
# Used for tracking of active processing jobs for a workspace to decide |
|
124
|
|
|
# when the shutdown a METS Server instance for that workspace |
|
125
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
|
126
|
|
|
# Value: integer which holds the amount of jobs pushed to the RabbitMQ |
|
127
|
|
|
# but no internal callback was yet invoked |
|
128
|
|
|
self.__processing_counter: Dict[str, int] = {} |
|
129
|
|
|
|
|
130
|
|
|
@staticmethod |
|
131
|
|
|
async def __check_if_job_deps_met(dependencies: List[str]) -> bool: |
|
132
|
|
|
# Check the states of all dependent jobs |
|
133
|
|
|
for dependency_job_id in dependencies: |
|
134
|
|
|
try: |
|
135
|
|
|
dependency_job_state = (await db_get_processing_job(dependency_job_id)).state |
|
136
|
|
|
except ValueError: |
|
137
|
|
|
# job_id not (yet) in db. Dependency not met |
|
138
|
|
|
return False |
|
139
|
|
|
# Found a dependent job whose state is not success |
|
140
|
|
|
if dependency_job_state != StateEnum.success: |
|
141
|
|
|
return False |
|
142
|
|
|
return True |
|
143
|
|
|
|
|
144
|
|
|
async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: |
|
145
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
|
146
|
|
|
self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}") |
|
147
|
|
|
return [] |
|
148
|
|
|
found_consume_requests = [] |
|
149
|
|
|
for i, current_element in enumerate(self.processing_requests[workspace_key]): |
|
150
|
|
|
# Request has other job dependencies |
|
151
|
|
|
if current_element.depends_on: |
|
152
|
|
|
satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on) |
|
153
|
|
|
if not satisfied_dependencies: |
|
154
|
|
|
continue |
|
155
|
|
|
found_consume_requests.append(current_element) |
|
156
|
|
|
found_requests = [] |
|
157
|
|
|
for found_element in found_consume_requests: |
|
158
|
|
|
try: |
|
159
|
|
|
(self.processing_requests[workspace_key]).remove(found_element) |
|
160
|
|
|
# self.log.debug(f"Found cached request to be processed: {found_request}") |
|
161
|
|
|
self.log.debug(f"Found cached request: {found_element.processor_name}, {found_element.page_id}, " |
|
162
|
|
|
f"{found_element.job_id}, depends_on: {found_element.depends_on}") |
|
163
|
|
|
found_requests.append(found_element) |
|
164
|
|
|
except ValueError: |
|
165
|
|
|
# The ValueError is not an issue since the |
|
166
|
|
|
# element was removed by another instance |
|
167
|
|
|
continue |
|
168
|
|
|
return found_requests |
|
169
|
|
|
|
|
170
|
|
|
def update_request_counter(self, workspace_key: str, by_value: int) -> int: |
|
171
|
|
|
""" |
|
172
|
|
|
A method used to increase/decrease the internal counter of some workspace_key by `by_value`. |
|
173
|
|
|
Returns the value of the updated counter. |
|
174
|
|
|
""" |
|
175
|
|
|
# If a record counter of this workspace key does not exist |
|
176
|
|
|
# in the requests counter cache yet, create one and assign 0 |
|
177
|
|
|
if not self.__processing_counter.get(workspace_key, None): |
|
178
|
|
|
self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}") |
|
179
|
|
|
self.__processing_counter[workspace_key] = 0 |
|
180
|
|
|
self.__processing_counter[workspace_key] = self.__processing_counter[workspace_key] + by_value |
|
181
|
|
|
return self.__processing_counter[workspace_key] |
|
182
|
|
|
|
|
183
|
|
|
def cache_request(self, workspace_key: str, data: PYJobInput): |
|
184
|
|
|
# If a record queue of this workspace key does not exist in the requests cache |
|
185
|
|
|
if not self.processing_requests.get(workspace_key, None): |
|
186
|
|
|
self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}") |
|
187
|
|
|
self.processing_requests[workspace_key] = [] |
|
188
|
|
|
self.log.debug(f"Caching request: {data.processor_name}, {data.page_id}, " |
|
189
|
|
|
f"{data.job_id}, depends_on: {data.depends_on}") |
|
190
|
|
|
# Add the processing request to the end of the internal queue |
|
191
|
|
|
self.processing_requests[workspace_key].append(data) |
|
192
|
|
|
|
|
193
|
|
|
async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: |
|
194
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
|
195
|
|
|
self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}") |
|
196
|
|
|
return [] |
|
197
|
|
|
self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}") |
|
198
|
|
|
found_cancel_requests = [] |
|
199
|
|
|
for i, current_element in enumerate(self.processing_requests[workspace_key]): |
|
200
|
|
|
if processing_job_id in current_element.depends_on: |
|
201
|
|
|
found_cancel_requests.append(current_element) |
|
202
|
|
|
cancelled_jobs = [] |
|
203
|
|
|
for cancel_element in found_cancel_requests: |
|
204
|
|
|
try: |
|
205
|
|
|
self.processing_requests[workspace_key].remove(cancel_element) |
|
206
|
|
|
self.log.debug(f"For job id: `{processing_job_id}`, " |
|
207
|
|
|
f"cancelling: {cancel_element.job_id}") |
|
208
|
|
|
cancelled_jobs.append(cancel_element) |
|
209
|
|
|
await db_update_processing_job(job_id=cancel_element.job_id, state=StateEnum.cancelled) |
|
210
|
|
|
# Recursively cancel dependent jobs for the cancelled job |
|
211
|
|
|
recursively_cancelled = await self.cancel_dependent_jobs( |
|
212
|
|
|
workspace_key=workspace_key, |
|
213
|
|
|
processing_job_id=cancel_element.job_id |
|
214
|
|
|
) |
|
215
|
|
|
# Add the recursively cancelled jobs to the main list of cancelled jobs |
|
216
|
|
|
cancelled_jobs.extend(recursively_cancelled) |
|
217
|
|
|
except ValueError: |
|
218
|
|
|
# The ValueError is not an issue since the |
|
219
|
|
|
# element was removed by another instance |
|
220
|
|
|
continue |
|
221
|
|
|
return cancelled_jobs |
|
222
|
|
|
|
|
223
|
|
|
async def is_caching_required(self, job_dependencies: List[str]) -> bool: |
|
224
|
|
|
if not len(job_dependencies): |
|
225
|
|
|
# no dependencies found |
|
226
|
|
|
return False |
|
227
|
|
|
if await self.__check_if_job_deps_met(job_dependencies): |
|
228
|
|
|
# all dependencies are met |
|
229
|
|
|
return False |
|
230
|
|
|
return True |
|
231
|
|
|
|
|
232
|
|
|
def has_workspace_cached_requests(self, workspace_key: str) -> bool: |
|
233
|
|
|
if not self.processing_requests.get(workspace_key, None): |
|
234
|
|
|
self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}") |
|
235
|
|
|
return False |
|
236
|
|
|
if not len(self.processing_requests[workspace_key]): |
|
237
|
|
|
self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}") |
|
238
|
|
|
return False |
|
239
|
|
|
return True |
|
240
|
|
|
|