|
1
|
|
|
from __future__ import annotations |
|
2
|
|
|
from typing import Dict, List |
|
3
|
|
|
|
|
4
|
|
|
from ocrd_utils import getLogger |
|
5
|
|
|
from .constants import JobState, SERVER_ALL_PAGES_PLACEHOLDER |
|
6
|
|
|
from .database import db_get_processing_job, db_update_processing_job |
|
7
|
|
|
from .logging_utils import ( |
|
8
|
|
|
configure_file_handler_with_formatter, |
|
9
|
|
|
get_cache_locked_pages_logging_file_path, |
|
10
|
|
|
get_cache_processing_requests_logging_file_path |
|
11
|
|
|
) |
|
12
|
|
|
from .models import PYJobInput |
|
13
|
|
|
from .utils import call_sync |
|
14
|
|
|
|
|
15
|
|
|
|
|
16
|
|
|
class CacheLockedPages: |
|
17
|
|
|
def __init__(self) -> None: |
|
18
|
|
|
self.log = getLogger("ocrd_network.server_cache.locked_pages") |
|
19
|
|
|
log_file = get_cache_locked_pages_logging_file_path() |
|
20
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
|
21
|
|
|
|
|
22
|
|
|
# Used for keeping track of locked pages for a workspace |
|
23
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
|
24
|
|
|
# Value: A dictionary where each dictionary key is the output file group, |
|
25
|
|
|
# and the values are list of strings representing the locked pages |
|
26
|
|
|
self.locked_pages: Dict[str, Dict[str, List[str]]] = {} |
|
27
|
|
|
# Used as a placeholder to lock all pages when no page_id is specified |
|
28
|
|
|
self.placeholder_all_pages: str = SERVER_ALL_PAGES_PLACEHOLDER |
|
29
|
|
|
|
|
30
|
|
|
def check_if_locked_pages_for_output_file_grps( |
|
31
|
|
|
self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] |
|
32
|
|
|
) -> bool: |
|
33
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
34
|
|
|
self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
|
35
|
|
|
return False |
|
36
|
|
|
debug_message = "Caching the received request due to locked output file grp pages." |
|
37
|
|
|
for file_group in output_file_grps: |
|
38
|
|
|
if file_group in self.locked_pages[workspace_key]: |
|
39
|
|
|
if self.placeholder_all_pages in self.locked_pages[workspace_key][file_group]: |
|
40
|
|
|
self.log.debug(debug_message) |
|
41
|
|
|
return True |
|
42
|
|
|
if not set(self.locked_pages[workspace_key][file_group]).isdisjoint(page_ids): |
|
43
|
|
|
self.log.debug(debug_message) |
|
44
|
|
|
return True |
|
45
|
|
|
return False |
|
46
|
|
|
|
|
47
|
|
|
def get_locked_pages(self, workspace_key: str) -> Dict[str, List[str]]: |
|
48
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
49
|
|
|
self.log.info(f"No locked pages available for workspace key: {workspace_key}") |
|
50
|
|
|
return {} |
|
51
|
|
|
return self.locked_pages[workspace_key] |
|
52
|
|
|
|
|
53
|
|
|
def lock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: |
|
54
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
55
|
|
|
self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
|
56
|
|
|
self.log.info(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") |
|
57
|
|
|
self.locked_pages[workspace_key] = {} |
|
58
|
|
|
for file_group in output_file_grps: |
|
59
|
|
|
if file_group not in self.locked_pages[workspace_key]: |
|
60
|
|
|
self.log.info(f"Creating an empty list for output file grp: {file_group}") |
|
61
|
|
|
self.locked_pages[workspace_key][file_group] = [] |
|
62
|
|
|
# The page id list is not empty - only some pages are in the request |
|
63
|
|
|
if page_ids: |
|
64
|
|
|
self.log.info(f"Locking pages for '{file_group}': {page_ids}") |
|
65
|
|
|
self.locked_pages[workspace_key][file_group].extend(page_ids) |
|
66
|
|
|
self.log.info(f"Locked pages of '{file_group}': {self.locked_pages[workspace_key][file_group]}") |
|
67
|
|
|
else: |
|
68
|
|
|
# Lock all pages with a single value |
|
69
|
|
|
self.log.info(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") |
|
70
|
|
|
self.locked_pages[workspace_key][file_group].append(self.placeholder_all_pages) |
|
71
|
|
|
|
|
72
|
|
|
def unlock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: |
|
73
|
|
|
if not self.locked_pages.get(workspace_key, None): |
|
74
|
|
|
self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") |
|
75
|
|
|
return |
|
76
|
|
|
for file_group in output_file_grps: |
|
77
|
|
|
if file_group in self.locked_pages[workspace_key]: |
|
78
|
|
|
if page_ids: |
|
79
|
|
|
# Unlock the previously locked pages |
|
80
|
|
|
self.log.info(f"Unlocking pages of '{file_group}': {page_ids}") |
|
81
|
|
|
self.locked_pages[workspace_key][file_group] = \ |
|
82
|
|
|
[x for x in self.locked_pages[workspace_key][file_group] if x not in page_ids] |
|
83
|
|
|
self.log.info(f"Remaining locked pages of '{file_group}': " |
|
84
|
|
|
f"{self.locked_pages[workspace_key][file_group]}") |
|
85
|
|
|
else: |
|
86
|
|
|
# Remove the single variable used to indicate all pages are locked |
|
87
|
|
|
self.log.info(f"Unlocking all pages for: {file_group}") |
|
88
|
|
|
self.locked_pages[workspace_key][file_group].remove(self.placeholder_all_pages) |
|
89
|
|
|
|
|
90
|
|
|
|
|
91
|
|
|
class CacheProcessingRequests: |
|
92
|
|
|
def __init__(self) -> None: |
|
93
|
|
|
self.log = getLogger("ocrd_network.server_cache.processing_requests") |
|
94
|
|
|
log_file = get_cache_processing_requests_logging_file_path() |
|
95
|
|
|
configure_file_handler_with_formatter(self.log, log_file=log_file, mode="a") |
|
96
|
|
|
|
|
97
|
|
|
# Used for buffering/caching processing requests in the Processing Server |
|
98
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
|
99
|
|
|
# Value: Queue that holds PYInputJob elements |
|
100
|
|
|
self.processing_requests: Dict[str, List[PYJobInput]] = {} |
|
101
|
|
|
|
|
102
|
|
|
# Used for tracking of active processing jobs for a workspace to decide |
|
103
|
|
|
# when the shutdown a METS Server instance for that workspace |
|
104
|
|
|
# Key: `path_to_mets` if already resolved else `workspace_id` |
|
105
|
|
|
# Value: integer which holds the amount of jobs pushed to the RabbitMQ |
|
106
|
|
|
# but no internal callback was yet invoked |
|
107
|
|
|
self.processing_counter: Dict[str, int] = {} |
|
108
|
|
|
|
|
109
|
|
|
@staticmethod |
|
110
|
|
|
async def __check_if_job_deps_met(dependencies: List[str]) -> bool: |
|
111
|
|
|
# Check the states of all dependent jobs |
|
112
|
|
|
for dependency_job_id in dependencies: |
|
113
|
|
|
try: |
|
114
|
|
|
dependency_job_state = (await db_get_processing_job(dependency_job_id)).state |
|
115
|
|
|
# Found a dependent job whose state is not success |
|
116
|
|
|
if dependency_job_state != JobState.success: |
|
117
|
|
|
return False |
|
118
|
|
|
except ValueError: |
|
119
|
|
|
# job_id not (yet) in db. Dependency not met |
|
120
|
|
|
return False |
|
121
|
|
|
return True |
|
122
|
|
|
|
|
123
|
|
|
def __print_job_input_debug_message(self, job_input: PYJobInput): |
|
124
|
|
|
debug_message = "Processing job input" |
|
125
|
|
|
debug_message += f", processor: {job_input.processor_name}" |
|
126
|
|
|
debug_message += f", page ids: {job_input.page_id}" |
|
127
|
|
|
debug_message += f", job id: {job_input.job_id}" |
|
128
|
|
|
debug_message += f", job depends on: {job_input.depends_on}" |
|
129
|
|
|
self.log.info(debug_message) |
|
130
|
|
|
|
|
131
|
|
|
async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: |
|
132
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
|
133
|
|
|
self.log.info(f"No jobs to be consumed for workspace key: {workspace_key}") |
|
134
|
|
|
return [] |
|
135
|
|
|
found_consume_requests = [] |
|
136
|
|
|
for current_element in self.processing_requests[workspace_key]: |
|
137
|
|
|
# Request has other job dependencies |
|
138
|
|
|
if current_element.depends_on: |
|
139
|
|
|
satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on) |
|
140
|
|
|
if not satisfied_dependencies: |
|
141
|
|
|
continue |
|
142
|
|
|
found_consume_requests.append(current_element) |
|
143
|
|
|
found_requests = [] |
|
144
|
|
|
for found_element in found_consume_requests: |
|
145
|
|
|
try: |
|
146
|
|
|
(self.processing_requests[workspace_key]).remove(found_element) |
|
147
|
|
|
# self.log.debug(f"Found cached request to be processed: {found_request}") |
|
148
|
|
|
self.__print_job_input_debug_message(job_input=found_element) |
|
149
|
|
|
found_requests.append(found_element) |
|
150
|
|
|
except ValueError: |
|
151
|
|
|
# The ValueError is not an issue since the element was removed by another instance |
|
152
|
|
|
continue |
|
153
|
|
|
return found_requests |
|
154
|
|
|
|
|
155
|
|
|
@call_sync |
|
156
|
|
|
async def sync_consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: |
|
157
|
|
|
return await self.consume_cached_requests(workspace_key=workspace_key) |
|
158
|
|
|
|
|
159
|
|
|
def update_request_counter(self, workspace_key: str, by_value: int) -> int: |
|
160
|
|
|
""" |
|
161
|
|
|
A method used to increase/decrease the internal counter of some workspace_key by `by_value`. |
|
162
|
|
|
Returns the value of the updated counter. |
|
163
|
|
|
""" |
|
164
|
|
|
# If a record counter of this workspace key does not exist |
|
165
|
|
|
# in the requests counter cache yet, create one and assign 0 |
|
166
|
|
|
if not self.processing_counter.get(workspace_key, None): |
|
167
|
|
|
self.log.info(f"Creating an internal request counter for workspace key: {workspace_key}") |
|
168
|
|
|
self.processing_counter[workspace_key] = 0 |
|
169
|
|
|
self.processing_counter[workspace_key] = self.processing_counter[workspace_key] + by_value |
|
170
|
|
|
self.log.info(f"The new request counter of {workspace_key}: {self.processing_counter[workspace_key]}") |
|
171
|
|
|
return self.processing_counter[workspace_key] |
|
172
|
|
|
|
|
173
|
|
|
def cache_request(self, workspace_key: str, data: PYJobInput): |
|
174
|
|
|
# If a record queue of this workspace key does not exist in the requests cache |
|
175
|
|
|
if not self.processing_requests.get(workspace_key, None): |
|
176
|
|
|
self.log.info(f"Creating an internal request queue for workspace_key: {workspace_key}") |
|
177
|
|
|
self.processing_requests[workspace_key] = [] |
|
178
|
|
|
self.__print_job_input_debug_message(job_input=data) |
|
179
|
|
|
# Add the processing request to the end of the internal queue |
|
180
|
|
|
self.log.info(f"Caching a processing request of {workspace_key}: {data.job_id}") |
|
181
|
|
|
self.processing_requests[workspace_key].append(data) |
|
182
|
|
|
|
|
183
|
|
|
async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: |
|
184
|
|
|
if not self.has_workspace_cached_requests(workspace_key=workspace_key): |
|
185
|
|
|
self.log.info(f"No jobs to be cancelled for workspace key: {workspace_key}") |
|
186
|
|
|
return [] |
|
187
|
|
|
self.log.info(f"Cancelling jobs dependent on job id: {processing_job_id}") |
|
188
|
|
|
found_cancel_requests = [] |
|
189
|
|
|
for i, current_element in enumerate(self.processing_requests[workspace_key]): |
|
190
|
|
|
if processing_job_id in current_element.depends_on: |
|
191
|
|
|
found_cancel_requests.append(current_element) |
|
192
|
|
|
cancelled_jobs = [] |
|
193
|
|
|
for cancel_element in found_cancel_requests: |
|
194
|
|
|
try: |
|
195
|
|
|
self.processing_requests[workspace_key].remove(cancel_element) |
|
196
|
|
|
self.log.info(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") |
|
197
|
|
|
cancelled_jobs.append(cancel_element) |
|
198
|
|
|
await db_update_processing_job(job_id=cancel_element.job_id, state=JobState.cancelled) |
|
199
|
|
|
# Recursively cancel dependent jobs for the cancelled job |
|
200
|
|
|
recursively_cancelled = await self.cancel_dependent_jobs( |
|
201
|
|
|
workspace_key=workspace_key, processing_job_id=cancel_element.job_id |
|
202
|
|
|
) |
|
203
|
|
|
# Add the recursively cancelled jobs to the main list of cancelled jobs |
|
204
|
|
|
cancelled_jobs.extend(recursively_cancelled) |
|
205
|
|
|
except ValueError: |
|
206
|
|
|
# The ValueError is not an issue since the element was removed by another instance |
|
207
|
|
|
continue |
|
208
|
|
|
return cancelled_jobs |
|
209
|
|
|
|
|
210
|
|
|
@call_sync |
|
211
|
|
|
async def sync_cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: |
|
212
|
|
|
# A synchronous wrapper around the async method |
|
213
|
|
|
return await self.cancel_dependent_jobs(workspace_key=workspace_key, processing_job_id=processing_job_id) |
|
214
|
|
|
|
|
215
|
|
|
async def is_caching_required(self, job_dependencies: List[str]) -> bool: |
|
216
|
|
|
if not len(job_dependencies): |
|
217
|
|
|
return False # no dependencies found |
|
218
|
|
|
if await self.__check_if_job_deps_met(job_dependencies): |
|
219
|
|
|
return False # all dependencies are met |
|
220
|
|
|
return True |
|
221
|
|
|
|
|
222
|
|
|
@call_sync |
|
223
|
|
|
async def sync_is_caching_required(self, job_dependencies: List[str]) -> bool: |
|
224
|
|
|
# A synchronous wrapper around the async method |
|
225
|
|
|
return await self.is_caching_required(job_dependencies=job_dependencies) |
|
226
|
|
|
|
|
227
|
|
|
def has_workspace_cached_requests(self, workspace_key: str) -> bool: |
|
228
|
|
|
if not self.processing_requests.get(workspace_key, None): |
|
229
|
|
|
self.log.info(f"In processing requests cache, no workspace key found: {workspace_key}") |
|
230
|
|
|
return False |
|
231
|
|
|
if not len(self.processing_requests[workspace_key]): |
|
232
|
|
|
self.log.info(f"The processing requests cache is empty for workspace key: {workspace_key}") |
|
233
|
|
|
return False |
|
234
|
|
|
self.log.info(f"The processing requests cache has {len(self.processing_requests[workspace_key])} " |
|
235
|
|
|
f"entries for workspace key: {workspace_key} ") |
|
236
|
|
|
return True |
|
237
|
|
|
|