1
|
|
|
from __future__ import annotations |
2
|
|
|
from typing import Dict, List |
3
|
|
|
|
4
|
|
|
from ocrd_utils import getLogger |
5
|
|
|
from .constants import JobState, SERVER_ALL_PAGES_PLACEHOLDER |
6
|
|
|
from .database import db_get_processing_job, db_update_processing_job |
7
|
|
|
from .logging_utils import ( |
8
|
|
|
configure_file_handler_with_formatter, |
9
|
|
|
get_cache_locked_pages_logging_file_path, |
10
|
|
|
get_cache_processing_requests_logging_file_path |
11
|
|
|
) |
12
|
|
|
from .models import PYJobInput |
13
|
|
|
from .utils import call_sync |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
class CacheLockedPages: |
17
|
|
|
def __init__(self) -> None: |
18
|
|
|
self.log = getLogger("ocrd_network.server_cache.locked_pages") |
19
|
|
|
log_file = get_cache_locked_pages_logging_file_path() |
20
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
21
|
|
|
|
22
|
|
|
# Used for keeping track of locked pages for a workspace |
23
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
24
|
|
|
# Value: A dictionary where each dictionary key is the output file group, |
25
|
|
|
# and the values are list of strings representing the locked pages |
26
|
|
|
self.locked_pages: Dict[str, Dict[str, List[str]]] = {} |
27
|
|
|
# Used as a placeholder to lock all pages when no page_id is specified |
28
|
|
|
self.placeholder_all_pages: str = SERVER_ALL_PAGES_PLACEHOLDER |
29
|
|
|
|
30
|
|
|
def check_if_locked_pages_for_output_file_grps( |
31
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
32
|
|
|
) -> bool: |
33
|
|
|
if not self.locked_pages.get(workspace_key, None): |
34
|
|
|
self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
35
|
|
|
return False |
36
|
|
|
debug_message = "Caching the received request due to locked output file grp pages." |
37
|
|
|
for file_group in output_file_grps: |
38
|
|
|
if file_group in self.locked_pages[workspace_key]: |
39
|
|
|
if self.placeholder_all_pages in self.locked_pages[workspace_key][file_group]: |
40
|
|
|
self.log.debug(debug_message) |
41
|
|
|
return True |
42
|
|
|
if not set(self.locked_pages[workspace_key][file_group]).isdisjoint(page_ids): |
43
|
|
|
self.log.debug(debug_message) |
44
|
|
|
return True |
45
|
|
|
return False |
46
|
|
|
|
47
|
|
|
def get_locked_pages(self, workspace_key: str) -> Dict[str, List[str]]: |
48
|
|
|
if not self.locked_pages.get(workspace_key, None): |
49
|
|
|
self.log.info(f"No locked pages available for workspace key: {workspace_key}") |
50
|
|
|
return {} |
51
|
|
|
return self.locked_pages[workspace_key] |
52
|
|
|
|
53
|
|
|
def lock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: |
54
|
|
|
if not self.locked_pages.get(workspace_key, None): |
55
|
|
|
self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
56
|
|
|
self.log.info(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") |
57
|
|
|
self.locked_pages[workspace_key] = {} |
58
|
|
|
for file_group in output_file_grps: |
59
|
|
|
if file_group not in self.locked_pages[workspace_key]: |
60
|
|
|
self.log.info(f"Creating an empty list for output file grp: {file_group}") |
61
|
|
|
self.locked_pages[workspace_key][file_group] = [] |
62
|
|
|
# The page id list is not empty - only some pages are in the request |
63
|
|
|
if page_ids: |
64
|
|
|
self.log.info(f"Locking pages for '{file_group}': {page_ids}") |
65
|
|
|
self.locked_pages[workspace_key][file_group].extend(page_ids) |
66
|
|
|
self.log.info(f"Locked pages of '{file_group}': {self.locked_pages[workspace_key][file_group]}") |
67
|
|
|
else: |
68
|
|
|
# Lock all pages with a single value |
69
|
|
|
self.log.info(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") |
70
|
|
|
self.locked_pages[workspace_key][file_group].append(self.placeholder_all_pages) |
71
|
|
|
|
72
|
|
|
def unlock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: |
73
|
|
|
if not self.locked_pages.get(workspace_key, None): |
74
|
|
|
self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
75
|
|
|
return |
76
|
|
|
for file_group in output_file_grps: |
77
|
|
|
if file_group in self.locked_pages[workspace_key]: |
78
|
|
|
if page_ids: |
79
|
|
|
# Unlock the previously locked pages |
80
|
|
|
self.log.info(f"Unlocking pages of '{file_group}': {page_ids}") |
81
|
|
|
self.locked_pages[workspace_key][file_group] = \ |
82
|
|
|
[x for x in self.locked_pages[workspace_key][file_group] if x not in page_ids] |
83
|
|
|
self.log.info(f"Remaining locked pages of '{file_group}': " |
84
|
|
|
f"{self.locked_pages[workspace_key][file_group]}") |
85
|
|
|
else: |
86
|
|
|
# Remove the single variable used to indicate all pages are locked |
87
|
|
|
self.log.info(f"Unlocking all pages for: {file_group}") |
88
|
|
|
self.locked_pages[workspace_key][file_group].remove(self.placeholder_all_pages) |
89
|
|
|
|
90
|
|
|
|
91
|
|
|
class CacheProcessingRequests: |
92
|
|
|
def __init__(self) -> None: |
93
|
|
|
self.log = getLogger("ocrd_network.server_cache.processing_requests") |
94
|
|
|
log_file = get_cache_processing_requests_logging_file_path() |
95
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
96
|
|
|
|
97
|
|
|
# Used for buffering/caching processing requests in the Processing Server |
98
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
99
|
|
|
# Value: Queue that holds PYInputJob elements |
100
|
|
|
self.processing_requests: Dict[str, List[PYJobInput]] = {} |
101
|
|
|
|
102
|
|
|
# Used for tracking of active processing jobs for a workspace to decide |
103
|
|
|
# when the shutdown a METS Server instance for that workspace |
104
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
105
|
|
|
# Value: integer which holds the amount of jobs pushed to the RabbitMQ |
106
|
|
|
# but no internal callback was yet invoked |
107
|
|
|
self.processing_counter: Dict[str, int] = {} |
108
|
|
|
|
109
|
|
|
@staticmethod |
110
|
|
|
async def __check_if_job_deps_met(dependencies: List[str]) -> bool: |
111
|
|
|
# Check the states of all dependent jobs |
112
|
|
|
for dependency_job_id in dependencies: |
113
|
|
|
try: |
114
|
|
|
dependency_job_state = (await db_get_processing_job(dependency_job_id)).state |
115
|
|
|
# Found a dependent job whose state is not success |
116
|
|
|
if dependency_job_state != JobState.success: |
117
|
|
|
return False |
118
|
|
|
except ValueError: |
119
|
|
|
# job_id not (yet) in db. Dependency not met |
120
|
|
|
return False |
121
|
|
|
return True |
122
|
|
|
|
123
|
|
|
def __print_job_input_debug_message(self, job_input: PYJobInput): |
124
|
|
|
debug_message = "Processing job input" |
125
|
|
|
debug_message += f", processor: {job_input.processor_name}" |
126
|
|
|
debug_message += f", page ids: {job_input.page_id}" |
127
|
|
|
debug_message += f", job id: {job_input.job_id}" |
128
|
|
|
debug_message += f", job depends on: {job_input.depends_on}" |
129
|
|
|
self.log.info(debug_message) |
130
|
|
|
|
131
|
|
|
async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: |
132
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
133
|
|
|
self.log.info(f"No jobs to be consumed for workspace key: {workspace_key}") |
134
|
|
|
return [] |
135
|
|
|
found_consume_requests = [] |
136
|
|
|
for current_element in self.processing_requests[workspace_key]: |
137
|
|
|
# Request has other job dependencies |
138
|
|
|
if current_element.depends_on: |
139
|
|
|
satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on) |
140
|
|
|
if not satisfied_dependencies: |
141
|
|
|
continue |
142
|
|
|
found_consume_requests.append(current_element) |
143
|
|
|
found_requests = [] |
144
|
|
|
for found_element in found_consume_requests: |
145
|
|
|
try: |
146
|
|
|
(self.processing_requests[workspace_key]).remove(found_element) |
147
|
|
|
# self.log.debug(f"Found cached request to be processed: {found_request}") |
148
|
|
|
self.__print_job_input_debug_message(job_input=found_element) |
149
|
|
|
found_requests.append(found_element) |
150
|
|
|
except ValueError: |
151
|
|
|
# The ValueError is not an issue since the element was removed by another instance |
152
|
|
|
continue |
153
|
|
|
return found_requests |
154
|
|
|
|
155
|
|
|
@call_sync |
156
|
|
|
async def sync_consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: |
157
|
|
|
return await self.consume_cached_requests(workspace_key=workspace_key) |
158
|
|
|
|
159
|
|
|
def update_request_counter(self, workspace_key: str, by_value: int) -> int: |
160
|
|
|
""" |
161
|
|
|
A method used to increase/decrease the internal counter of some workspace_key by `by_value`. |
162
|
|
|
Returns the value of the updated counter. |
163
|
|
|
""" |
164
|
|
|
# If a record counter of this workspace key does not exist |
165
|
|
|
# in the requests counter cache yet, create one and assign 0 |
166
|
|
|
if not self.processing_counter.get(workspace_key, None): |
167
|
|
|
self.log.info(f"Creating an internal request counter for workspace key: {workspace_key}") |
168
|
|
|
self.processing_counter[workspace_key] = 0 |
169
|
|
|
self.processing_counter[workspace_key] = self.processing_counter[workspace_key] + by_value |
170
|
|
|
self.log.info(f"The new request counter of {workspace_key}: {self.processing_counter[workspace_key]}") |
171
|
|
|
return self.processing_counter[workspace_key] |
172
|
|
|
|
173
|
|
|
def cache_request(self, workspace_key: str, data: PYJobInput): |
174
|
|
|
# If a record queue of this workspace key does not exist in the requests cache |
175
|
|
|
if not self.processing_requests.get(workspace_key, None): |
176
|
|
|
self.log.info(f"Creating an internal request queue for workspace_key: {workspace_key}") |
177
|
|
|
self.processing_requests[workspace_key] = [] |
178
|
|
|
self.__print_job_input_debug_message(job_input=data) |
179
|
|
|
# Add the processing request to the end of the internal queue |
180
|
|
|
self.log.info(f"Caching a processing request of {workspace_key}: {data.job_id}") |
181
|
|
|
self.processing_requests[workspace_key].append(data) |
182
|
|
|
|
183
|
|
|
async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: |
184
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
185
|
|
|
self.log.info(f"No jobs to be cancelled for workspace key: {workspace_key}") |
186
|
|
|
return [] |
187
|
|
|
self.log.info(f"Cancelling jobs dependent on job id: {processing_job_id}") |
188
|
|
|
found_cancel_requests = [] |
189
|
|
|
for i, current_element in enumerate(self.processing_requests[workspace_key]): |
190
|
|
|
if processing_job_id in current_element.depends_on: |
191
|
|
|
found_cancel_requests.append(current_element) |
192
|
|
|
cancelled_jobs = [] |
193
|
|
|
for cancel_element in found_cancel_requests: |
194
|
|
|
try: |
195
|
|
|
self.processing_requests[workspace_key].remove(cancel_element) |
196
|
|
|
self.log.info(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") |
197
|
|
|
cancelled_jobs.append(cancel_element) |
198
|
|
|
await db_update_processing_job(job_id=cancel_element.job_id, state=JobState.cancelled) |
199
|
|
|
# Recursively cancel dependent jobs for the cancelled job |
200
|
|
|
recursively_cancelled = await self.cancel_dependent_jobs( |
201
|
|
|
workspace_key=workspace_key, processing_job_id=cancel_element.job_id |
202
|
|
|
) |
203
|
|
|
# Add the recursively cancelled jobs to the main list of cancelled jobs |
204
|
|
|
cancelled_jobs.extend(recursively_cancelled) |
205
|
|
|
except ValueError: |
206
|
|
|
# The ValueError is not an issue since the element was removed by another instance |
207
|
|
|
continue |
208
|
|
|
return cancelled_jobs |
209
|
|
|
|
210
|
|
|
@call_sync |
211
|
|
|
async def sync_cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: |
212
|
|
|
# A synchronous wrapper around the async method |
213
|
|
|
return await self.cancel_dependent_jobs(workspace_key=workspace_key, processing_job_id=processing_job_id) |
214
|
|
|
|
215
|
|
|
async def is_caching_required(self, job_dependencies: List[str]) -> bool: |
216
|
|
|
if not len(job_dependencies): |
217
|
|
|
return False # no dependencies found |
218
|
|
|
if await self.__check_if_job_deps_met(job_dependencies): |
219
|
|
|
return False # all dependencies are met |
220
|
|
|
return True |
221
|
|
|
|
222
|
|
|
@call_sync |
223
|
|
|
async def sync_is_caching_required(self, job_dependencies: List[str]) -> bool: |
224
|
|
|
# A synchronous wrapper around the async method |
225
|
|
|
return await self.is_caching_required(job_dependencies=job_dependencies) |
226
|
|
|
|
227
|
|
|
def has_workspace_cached_requests(self, workspace_key: str) -> bool: |
228
|
|
|
if not self.processing_requests.get(workspace_key, None): |
229
|
|
|
self.log.info(f"In processing requests cache, no workspace key found: {workspace_key}") |
230
|
|
|
return False |
231
|
|
|
if not len(self.processing_requests[workspace_key]): |
232
|
|
|
self.log.info(f"The processing requests cache is empty for workspace key: {workspace_key}") |
233
|
|
|
return False |
234
|
|
|
self.log.info(f"The processing requests cache has {len(self.processing_requests[workspace_key])} " |
235
|
|
|
f"entries for workspace key: {workspace_key} ") |
236
|
|
|
return True |
237
|
|
|
|