1
|
|
|
from __future__ import annotations |
2
|
|
|
from typing import Dict, List |
3
|
|
|
from logging import DEBUG, getLogger, FileHandler |
4
|
|
|
|
5
|
|
|
from .database import db_get_processing_job, db_update_processing_job |
6
|
|
|
from .models import PYJobInput, StateEnum |
7
|
|
|
|
8
|
|
|
__all__ = [ |
9
|
|
|
'CacheLockedPages', |
10
|
|
|
'CacheProcessingRequests' |
11
|
|
|
] |
12
|
|
|
|
13
|
|
|
|
14
|
|
|
class CacheLockedPages: |
15
|
|
|
def __init__(self) -> None: |
16
|
|
|
self.log = getLogger("ocrd_network.server_cache.locked_pages") |
17
|
|
|
# TODO: remove this when refactoring the logging |
18
|
|
|
self.log.setLevel(DEBUG) |
19
|
|
|
log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_locked_pages.log') |
20
|
|
|
log_fh.setLevel(DEBUG) |
21
|
|
|
self.log.addHandler(log_fh) |
22
|
|
|
|
23
|
|
|
# Used for keeping track of locked pages for a workspace |
24
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
25
|
|
|
# Value: A dictionary where each dictionary key is the output file group, |
26
|
|
|
# and the values are list of strings representing the locked pages |
27
|
|
|
self.locked_pages: Dict[str, Dict[str, List[str]]] = {} |
28
|
|
|
# Used as a placeholder to lock all pages when no page_id is specified |
29
|
|
|
self.placeholder_all_pages: str = "all_pages" |
30
|
|
|
|
31
|
|
|
def check_if_locked_pages_for_output_file_grps( |
32
|
|
|
self, |
33
|
|
|
workspace_key: str, |
34
|
|
|
output_file_grps: List[str], |
35
|
|
|
page_ids: List[str] |
36
|
|
|
) -> bool: |
37
|
|
|
if not self.locked_pages.get(workspace_key, None): |
38
|
|
|
self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
39
|
|
|
return False |
40
|
|
|
for output_fileGrp in output_file_grps: |
41
|
|
|
if output_fileGrp in self.locked_pages[workspace_key]: |
42
|
|
|
if self.placeholder_all_pages in self.locked_pages[workspace_key][output_fileGrp]: |
43
|
|
|
self.log.debug(f"Caching the received request due to locked output file grp pages") |
44
|
|
|
return True |
45
|
|
|
if not set(self.locked_pages[workspace_key][output_fileGrp]).isdisjoint(page_ids): |
46
|
|
|
self.log.debug(f"Caching the received request due to locked output file grp pages") |
47
|
|
|
return True |
48
|
|
|
return False |
49
|
|
|
|
50
|
|
|
def get_locked_pages( |
51
|
|
|
self, |
52
|
|
|
workspace_key: str |
53
|
|
|
) -> Dict[str, List[str]]: |
54
|
|
|
if not self.locked_pages.get(workspace_key, None): |
55
|
|
|
self.log.debug(f"No locked pages available for workspace key: {workspace_key}") |
56
|
|
|
return {} |
57
|
|
|
return self.locked_pages[workspace_key] |
58
|
|
|
|
59
|
|
|
def lock_pages( |
60
|
|
|
self, |
61
|
|
|
workspace_key: str, |
62
|
|
|
output_file_grps: List[str], |
63
|
|
|
page_ids: List[str] |
64
|
|
|
) -> None: |
65
|
|
|
if not self.locked_pages.get(workspace_key, None): |
66
|
|
|
self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
67
|
|
|
self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") |
68
|
|
|
self.locked_pages[workspace_key] = {} |
69
|
|
|
|
70
|
|
|
for output_fileGrp in output_file_grps: |
71
|
|
|
if output_fileGrp not in self.locked_pages[workspace_key]: |
72
|
|
|
self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}") |
73
|
|
|
self.locked_pages[workspace_key][output_fileGrp] = [] |
74
|
|
|
# The page id list is not empty - only some pages are in the request |
75
|
|
|
if page_ids: |
76
|
|
|
self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}") |
77
|
|
|
self.locked_pages[workspace_key][output_fileGrp].extend(page_ids) |
78
|
|
|
self.log.debug(f"Locked pages of `{output_fileGrp}`: " |
79
|
|
|
f"{self.locked_pages[workspace_key][output_fileGrp]}") |
80
|
|
|
else: |
81
|
|
|
# Lock all pages with a single value |
82
|
|
|
self.log.debug(f"Locking pages for `{output_fileGrp}`: {self.placeholder_all_pages}") |
83
|
|
|
self.locked_pages[workspace_key][output_fileGrp].append(self.placeholder_all_pages) |
84
|
|
|
|
85
|
|
|
def unlock_pages( |
86
|
|
|
self, |
87
|
|
|
workspace_key: str, |
88
|
|
|
output_file_grps: List[str], |
89
|
|
|
page_ids: List[str] |
90
|
|
|
) -> None: |
91
|
|
|
if not self.locked_pages.get(workspace_key, None): |
92
|
|
|
self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
93
|
|
|
return |
94
|
|
|
for output_fileGrp in output_file_grps: |
95
|
|
|
if output_fileGrp in self.locked_pages[workspace_key]: |
96
|
|
|
if page_ids: |
97
|
|
|
# Unlock the previously locked pages |
98
|
|
|
self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}") |
99
|
|
|
self.locked_pages[workspace_key][output_fileGrp] = \ |
100
|
|
|
[x for x in self.locked_pages[workspace_key][output_fileGrp] if x not in page_ids] |
101
|
|
|
self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: " |
102
|
|
|
f"{self.locked_pages[workspace_key][output_fileGrp]}") |
103
|
|
|
else: |
104
|
|
|
# Remove the single variable used to indicate all pages are locked |
105
|
|
|
self.log.debug(f"Unlocking all pages for: {output_fileGrp}") |
106
|
|
|
self.locked_pages[workspace_key][output_fileGrp].remove(self.placeholder_all_pages) |
107
|
|
|
|
108
|
|
|
|
109
|
|
|
class CacheProcessingRequests: |
110
|
|
|
def __init__(self) -> None: |
111
|
|
|
self.log = getLogger("ocrd_network.server_cache.processing_requests") |
112
|
|
|
# TODO: remove this when refactoring the logging |
113
|
|
|
self.log.setLevel(DEBUG) |
114
|
|
|
log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_processing_requests.log') |
115
|
|
|
log_fh.setLevel(DEBUG) |
116
|
|
|
self.log.addHandler(log_fh) |
117
|
|
|
|
118
|
|
|
# Used for buffering/caching processing requests in the Processing Server |
119
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
120
|
|
|
# Value: Queue that holds PYInputJob elements |
121
|
|
|
self.processing_requests: Dict[str, List[PYJobInput]] = {} |
122
|
|
|
|
123
|
|
|
# Used for tracking of active processing jobs for a workspace to decide |
124
|
|
|
# when the shutdown a METS Server instance for that workspace |
125
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
126
|
|
|
# Value: integer which holds the amount of jobs pushed to the RabbitMQ |
127
|
|
|
# but no internal callback was yet invoked |
128
|
|
|
self.__processing_counter: Dict[str, int] = {} |
129
|
|
|
|
130
|
|
|
@staticmethod |
131
|
|
|
async def __check_if_job_deps_met(dependencies: List[str]) -> bool: |
132
|
|
|
# Check the states of all dependent jobs |
133
|
|
|
for dependency_job_id in dependencies: |
134
|
|
|
try: |
135
|
|
|
dependency_job_state = (await db_get_processing_job(dependency_job_id)).state |
136
|
|
|
except ValueError: |
137
|
|
|
# job_id not (yet) in db. Dependency not met |
138
|
|
|
return False |
139
|
|
|
# Found a dependent job whose state is not success |
140
|
|
|
if dependency_job_state != StateEnum.success: |
141
|
|
|
return False |
142
|
|
|
return True |
143
|
|
|
|
144
|
|
|
async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: |
145
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
146
|
|
|
self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}") |
147
|
|
|
return [] |
148
|
|
|
found_consume_requests = [] |
149
|
|
|
for i, current_element in enumerate(self.processing_requests[workspace_key]): |
150
|
|
|
# Request has other job dependencies |
151
|
|
|
if current_element.depends_on: |
152
|
|
|
satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on) |
153
|
|
|
if not satisfied_dependencies: |
154
|
|
|
continue |
155
|
|
|
found_consume_requests.append(current_element) |
156
|
|
|
found_requests = [] |
157
|
|
|
for found_element in found_consume_requests: |
158
|
|
|
try: |
159
|
|
|
(self.processing_requests[workspace_key]).remove(found_element) |
160
|
|
|
# self.log.debug(f"Found cached request to be processed: {found_request}") |
161
|
|
|
self.log.debug(f"Found cached request: {found_element.processor_name}, {found_element.page_id}, " |
162
|
|
|
f"{found_element.job_id}, depends_on: {found_element.depends_on}") |
163
|
|
|
found_requests.append(found_element) |
164
|
|
|
except ValueError: |
165
|
|
|
# The ValueError is not an issue since the |
166
|
|
|
# element was removed by another instance |
167
|
|
|
continue |
168
|
|
|
return found_requests |
169
|
|
|
|
170
|
|
|
def update_request_counter(self, workspace_key: str, by_value: int) -> int: |
171
|
|
|
""" |
172
|
|
|
A method used to increase/decrease the internal counter of some workspace_key by `by_value`. |
173
|
|
|
Returns the value of the updated counter. |
174
|
|
|
""" |
175
|
|
|
# If a record counter of this workspace key does not exist |
176
|
|
|
# in the requests counter cache yet, create one and assign 0 |
177
|
|
|
if not self.__processing_counter.get(workspace_key, None): |
178
|
|
|
self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}") |
179
|
|
|
self.__processing_counter[workspace_key] = 0 |
180
|
|
|
self.__processing_counter[workspace_key] = self.__processing_counter[workspace_key] + by_value |
181
|
|
|
return self.__processing_counter[workspace_key] |
182
|
|
|
|
183
|
|
|
def cache_request(self, workspace_key: str, data: PYJobInput): |
184
|
|
|
# If a record queue of this workspace key does not exist in the requests cache |
185
|
|
|
if not self.processing_requests.get(workspace_key, None): |
186
|
|
|
self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}") |
187
|
|
|
self.processing_requests[workspace_key] = [] |
188
|
|
|
self.log.debug(f"Caching request: {data.processor_name}, {data.page_id}, " |
189
|
|
|
f"{data.job_id}, depends_on: {data.depends_on}") |
190
|
|
|
# Add the processing request to the end of the internal queue |
191
|
|
|
self.processing_requests[workspace_key].append(data) |
192
|
|
|
|
193
|
|
|
async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: |
194
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
195
|
|
|
self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}") |
196
|
|
|
return [] |
197
|
|
|
self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}") |
198
|
|
|
found_cancel_requests = [] |
199
|
|
|
for i, current_element in enumerate(self.processing_requests[workspace_key]): |
200
|
|
|
if processing_job_id in current_element.depends_on: |
201
|
|
|
found_cancel_requests.append(current_element) |
202
|
|
|
cancelled_jobs = [] |
203
|
|
|
for cancel_element in found_cancel_requests: |
204
|
|
|
try: |
205
|
|
|
self.processing_requests[workspace_key].remove(cancel_element) |
206
|
|
|
self.log.debug(f"For job id: `{processing_job_id}`, " |
207
|
|
|
f"cancelling: {cancel_element.job_id}") |
208
|
|
|
cancelled_jobs.append(cancel_element) |
209
|
|
|
await db_update_processing_job(job_id=cancel_element.job_id, state=StateEnum.cancelled) |
210
|
|
|
# Recursively cancel dependent jobs for the cancelled job |
211
|
|
|
recursively_cancelled = await self.cancel_dependent_jobs( |
212
|
|
|
workspace_key=workspace_key, |
213
|
|
|
processing_job_id=cancel_element.job_id |
214
|
|
|
) |
215
|
|
|
# Add the recursively cancelled jobs to the main list of cancelled jobs |
216
|
|
|
cancelled_jobs.extend(recursively_cancelled) |
217
|
|
|
except ValueError: |
218
|
|
|
# The ValueError is not an issue since the |
219
|
|
|
# element was removed by another instance |
220
|
|
|
continue |
221
|
|
|
return cancelled_jobs |
222
|
|
|
|
223
|
|
|
async def is_caching_required(self, job_dependencies: List[str]) -> bool: |
224
|
|
|
if not len(job_dependencies): |
225
|
|
|
# no dependencies found |
226
|
|
|
return False |
227
|
|
|
if await self.__check_if_job_deps_met(job_dependencies): |
228
|
|
|
# all dependencies are met |
229
|
|
|
return False |
230
|
|
|
return True |
231
|
|
|
|
232
|
|
|
def has_workspace_cached_requests(self, workspace_key: str) -> bool: |
233
|
|
|
if not self.processing_requests.get(workspace_key, None): |
234
|
|
|
self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}") |
235
|
|
|
return False |
236
|
|
|
if not len(self.processing_requests[workspace_key]): |
237
|
|
|
self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}") |
238
|
|
|
return False |
239
|
|
|
return True |
240
|
|
|
|