Passed
Pull Request — master (#1083)
by Konstantin
02:49
created

ocrd_network.server_cache   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 240
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 47
eloc 167
dl 0
loc 240
rs 8.64
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
B CacheProcessingRequests.consume_cached_requests() 0 25 7
B CacheLockedPages.lock_pages() 0 25 5
B CacheLockedPages.check_if_locked_pages_for_output_file_grps() 0 18 6
A CacheLockedPages.unlock_pages() 0 22 5
A CacheProcessingRequests.update_request_counter() 0 12 2
A CacheProcessingRequests.cache_request() 0 9 2
A CacheLockedPages.get_locked_pages() 0 8 2
A CacheProcessingRequests.__check_if_job_deps_met() 0 13 4
A CacheProcessingRequests.is_caching_required() 0 8 3
B CacheProcessingRequests.cancel_dependent_jobs() 0 29 6
A CacheProcessingRequests.__init__() 0 19 1
A CacheLockedPages.__init__() 0 15 1
A CacheProcessingRequests.has_workspace_cached_requests() 0 8 3

How to fix   Complexity   

Complexity

Complex classes like ocrd_network.server_cache often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import annotations
2
from typing import Dict, List
3
from logging import DEBUG, getLogger, FileHandler
4
5
from .database import db_get_processing_job, db_update_processing_job
6
from .models import PYJobInput, StateEnum
7
8
__all__ = [
9
    'CacheLockedPages',
10
    'CacheProcessingRequests'
11
]
12
13
14
class CacheLockedPages:
15
    def __init__(self) -> None:
16
        self.log = getLogger("ocrd_network.server_cache.locked_pages")
17
        # TODO: remove this when refactoring the logging
18
        self.log.setLevel(DEBUG)
19
        log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_locked_pages.log')
20
        log_fh.setLevel(DEBUG)
21
        self.log.addHandler(log_fh)
22
23
        # Used for keeping track of locked pages for a workspace
24
        # Key: `path_to_mets` if already resolved else `workspace_id`
25
        # Value: A dictionary where each dictionary key is the output file group,
26
        # and the values are list of strings representing the locked pages
27
        self.locked_pages: Dict[str, Dict[str, List[str]]] = {}
28
        # Used as a placeholder to lock all pages when no page_id is specified
29
        self.placeholder_all_pages: str = "all_pages"
30
31
    def check_if_locked_pages_for_output_file_grps(
32
            self,
33
            workspace_key: str,
34
            output_file_grps: List[str],
35
            page_ids: List[str]
36
    ) -> bool:
37
        if not self.locked_pages.get(workspace_key, None):
38
            self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}")
39
            return False
40
        for output_fileGrp in output_file_grps:
41
            if output_fileGrp in self.locked_pages[workspace_key]:
42
                if self.placeholder_all_pages in self.locked_pages[workspace_key][output_fileGrp]:
43
                    self.log.debug(f"Caching the received request due to locked output file grp pages")
44
                    return True
45
                if not set(self.locked_pages[workspace_key][output_fileGrp]).isdisjoint(page_ids):
46
                    self.log.debug(f"Caching the received request due to locked output file grp pages")
47
                    return True
48
        return False
49
50
    def get_locked_pages(
51
            self,
52
            workspace_key: str
53
    ) -> Dict[str, List[str]]:
54
        if not self.locked_pages.get(workspace_key, None):
55
            self.log.debug(f"No locked pages available for workspace key: {workspace_key}")
56
            return {}
57
        return self.locked_pages[workspace_key]
58
59
    def lock_pages(
60
            self,
61
            workspace_key: str,
62
            output_file_grps: List[str],
63
            page_ids: List[str]
64
    ) -> None:
65
        if not self.locked_pages.get(workspace_key, None):
66
            self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}")
67
            self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}")
68
            self.locked_pages[workspace_key] = {}
69
70
        for output_fileGrp in output_file_grps:
71
            if output_fileGrp not in self.locked_pages[workspace_key]:
72
                self.log.debug(f"Creating an empty list for output file grp: {output_fileGrp}")
73
                self.locked_pages[workspace_key][output_fileGrp] = []
74
            # The page id list is not empty - only some pages are in the request
75
            if page_ids:
76
                self.log.debug(f"Locking pages for `{output_fileGrp}`: {page_ids}")
77
                self.locked_pages[workspace_key][output_fileGrp].extend(page_ids)
78
                self.log.debug(f"Locked pages of `{output_fileGrp}`: "
79
                               f"{self.locked_pages[workspace_key][output_fileGrp]}")
80
            else:
81
                # Lock all pages with a single value
82
                self.log.debug(f"Locking pages for `{output_fileGrp}`: {self.placeholder_all_pages}")
83
                self.locked_pages[workspace_key][output_fileGrp].append(self.placeholder_all_pages)
84
85
    def unlock_pages(
86
            self,
87
            workspace_key: str,
88
            output_file_grps: List[str],
89
            page_ids: List[str]
90
    ) -> None:
91
        if not self.locked_pages.get(workspace_key, None):
92
            self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}")
93
            return
94
        for output_fileGrp in output_file_grps:
95
            if output_fileGrp in self.locked_pages[workspace_key]:
96
                if page_ids:
97
                    # Unlock the previously locked pages
98
                    self.log.debug(f"Unlocking pages of `{output_fileGrp}`: {page_ids}")
99
                    self.locked_pages[workspace_key][output_fileGrp] = \
100
                        [x for x in self.locked_pages[workspace_key][output_fileGrp] if x not in page_ids]
101
                    self.log.debug(f"Remaining locked pages of `{output_fileGrp}`: "
102
                                   f"{self.locked_pages[workspace_key][output_fileGrp]}")
103
                else:
104
                    # Remove the single variable used to indicate all pages are locked
105
                    self.log.debug(f"Unlocking all pages for: {output_fileGrp}")
106
                    self.locked_pages[workspace_key][output_fileGrp].remove(self.placeholder_all_pages)
107
108
109
class CacheProcessingRequests:
110
    def __init__(self) -> None:
111
        self.log = getLogger("ocrd_network.server_cache.processing_requests")
112
        # TODO: remove this when refactoring the logging
113
        self.log.setLevel(DEBUG)
114
        log_fh = FileHandler(f'/tmp/ocrd_processing_server_cache_processing_requests.log')
115
        log_fh.setLevel(DEBUG)
116
        self.log.addHandler(log_fh)
117
118
        # Used for buffering/caching processing requests in the Processing Server
119
        # Key: `path_to_mets` if already resolved else `workspace_id`
120
        # Value: Queue that holds PYInputJob elements
121
        self.processing_requests: Dict[str, List[PYJobInput]] = {}
122
123
        # Used for tracking of active processing jobs for a workspace to decide
124
        # when the shutdown a METS Server instance for that workspace
125
        # Key: `path_to_mets` if already resolved else `workspace_id`
126
        # Value: integer which holds the amount of jobs pushed to the RabbitMQ
127
        # but no internal callback was yet invoked
128
        self.__processing_counter: Dict[str, int] = {}
129
130
    @staticmethod
131
    async def __check_if_job_deps_met(dependencies: List[str]) -> bool:
132
        # Check the states of all dependent jobs
133
        for dependency_job_id in dependencies:
134
            try:
135
                dependency_job_state = (await db_get_processing_job(dependency_job_id)).state
136
            except ValueError:
137
                # job_id not (yet) in db. Dependency not met
138
                return False
139
            # Found a dependent job whose state is not success
140
            if dependency_job_state != StateEnum.success:
141
                return False
142
        return True
143
144
    async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]:
145
        if not self.has_workspace_cached_requests(workspace_key=workspace_key):
146
            self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}")
147
            return []
148
        found_consume_requests = []
149
        for i, current_element in enumerate(self.processing_requests[workspace_key]):
150
            # Request has other job dependencies
151
            if current_element.depends_on:
152
                satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on)
153
                if not satisfied_dependencies:
154
                    continue
155
            found_consume_requests.append(current_element)
156
        found_requests = []
157
        for found_element in found_consume_requests:
158
            try:
159
                (self.processing_requests[workspace_key]).remove(found_element)
160
                # self.log.debug(f"Found cached request to be processed: {found_request}")
161
                self.log.debug(f"Found cached request: {found_element.processor_name}, {found_element.page_id}, "
162
                               f"{found_element.job_id}, depends_on: {found_element.depends_on}")
163
                found_requests.append(found_element)
164
            except ValueError:
165
                # The ValueError is not an issue since the
166
                # element was removed by another instance
167
                continue
168
        return found_requests
169
170
    def update_request_counter(self, workspace_key: str, by_value: int) -> int:
171
        """
172
        A method used to increase/decrease the internal counter of some workspace_key by `by_value`.
173
        Returns the value of the updated counter.
174
        """
175
        # If a record counter of this workspace key does not exist
176
        # in the requests counter cache yet, create one and assign 0
177
        if not self.__processing_counter.get(workspace_key, None):
178
            self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}")
179
            self.__processing_counter[workspace_key] = 0
180
        self.__processing_counter[workspace_key] = self.__processing_counter[workspace_key] + by_value
181
        return self.__processing_counter[workspace_key]
182
183
    def cache_request(self, workspace_key: str, data: PYJobInput):
184
        # If a record queue of this workspace key does not exist in the requests cache
185
        if not self.processing_requests.get(workspace_key, None):
186
            self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}")
187
            self.processing_requests[workspace_key] = []
188
        self.log.debug(f"Caching request: {data.processor_name}, {data.page_id}, "
189
                       f"{data.job_id}, depends_on: {data.depends_on}")
190
        # Add the processing request to the end of the internal queue
191
        self.processing_requests[workspace_key].append(data)
192
193
    async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]:
194
        if not self.has_workspace_cached_requests(workspace_key=workspace_key):
195
            self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}")
196
            return []
197
        self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}")
198
        found_cancel_requests = []
199
        for i, current_element in enumerate(self.processing_requests[workspace_key]):
200
            if processing_job_id in current_element.depends_on:
201
                found_cancel_requests.append(current_element)
202
        cancelled_jobs = []
203
        for cancel_element in found_cancel_requests:
204
            try:
205
                self.processing_requests[workspace_key].remove(cancel_element)
206
                self.log.debug(f"For job id: `{processing_job_id}`, "
207
                               f"cancelling: {cancel_element.job_id}")
208
                cancelled_jobs.append(cancel_element)
209
                await db_update_processing_job(job_id=cancel_element.job_id, state=StateEnum.cancelled)
210
                # Recursively cancel dependent jobs for the cancelled job
211
                recursively_cancelled = await self.cancel_dependent_jobs(
212
                    workspace_key=workspace_key,
213
                    processing_job_id=cancel_element.job_id
214
                )
215
                # Add the recursively cancelled jobs to the main list of cancelled jobs
216
                cancelled_jobs.extend(recursively_cancelled)
217
            except ValueError:
218
                # The ValueError is not an issue since the
219
                # element was removed by another instance
220
                continue
221
        return cancelled_jobs
222
223
    async def is_caching_required(self, job_dependencies: List[str]) -> bool:
224
        if not len(job_dependencies):
225
            # no dependencies found
226
            return False
227
        if await self.__check_if_job_deps_met(job_dependencies):
228
            # all dependencies are met
229
            return False
230
        return True
231
232
    def has_workspace_cached_requests(self, workspace_key: str) -> bool:
233
        if not self.processing_requests.get(workspace_key, None):
234
            self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}")
235
            return False
236
        if not len(self.processing_requests[workspace_key]):
237
            self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}")
238
            return False
239
        return True
240