Total Complexity | 60 |
Total Lines | 426 |
Duplicated Lines | 4.23 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like senaite.core.datamanagers.field.sample_analyses often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of SENAITE.CORE. |
||
4 | # |
||
5 | # SENAITE.CORE is free software: you can redistribute it and/or modify it under |
||
6 | # the terms of the GNU General Public License as published by the Free Software |
||
7 | # Foundation, version 2. |
||
8 | # |
||
9 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
10 | # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
||
11 | # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
||
12 | # details. |
||
13 | # |
||
14 | # You should have received a copy of the GNU General Public License along with |
||
15 | # this program; if not, write to the Free Software Foundation, Inc., 51 |
||
16 | # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
||
17 | # |
||
18 | # Copyright 2018-2025 by it's authors. |
||
19 | # Some rights reserved, see README and LICENSE. |
||
20 | |||
21 | import itertools |
||
22 | |||
23 | from AccessControl import Unauthorized |
||
24 | from bika.lims import api |
||
25 | from bika.lims import logger |
||
26 | from bika.lims.api.security import check_permission |
||
27 | from bika.lims.interfaces import IAnalysis |
||
28 | from bika.lims.interfaces import IAnalysisService |
||
29 | from bika.lims.interfaces import ISubmitted |
||
30 | from bika.lims.utils.analysis import create_analysis |
||
31 | from senaite.core.catalog import ANALYSIS_CATALOG |
||
32 | from senaite.core.catalog import SETUP_CATALOG |
||
33 | from senaite.core.datamanagers.base import FieldDataManager |
||
34 | from senaite.core.permissions import AddAnalysis |
||
35 | |||
36 | DETACHED_STATES = ["cancelled", "retracted", "rejected"] |
||
37 | |||
38 | |||
39 | class SampleAnalysesFieldDataManager(FieldDataManager): |
||
40 | """Data Manager for Routine Analyses |
||
41 | """ |
||
42 | def __init__(self, context, request, field): |
||
43 | self.context = context |
||
44 | self.request = request |
||
45 | self.field = field |
||
46 | |||
47 | def get(self, **kw): |
||
48 | """Returns a list of Analyses assigned to this AR |
||
49 | |||
50 | Return a list of catalog brains unless `full_objects=True` is passed. |
||
51 | Other keyword arguments are passed to senaite_catalog_analysis |
||
52 | |||
53 | :param instance: Analysis Request object |
||
54 | :param kwargs: Keyword arguments to inject in the search query |
||
55 | :returns: A list of Analysis Objects/Catalog Brains |
||
56 | """ |
||
57 | # Filter out parameters from kwargs that don't match with indexes |
||
58 | catalog = api.get_tool(ANALYSIS_CATALOG) |
||
59 | indexes = catalog.indexes() |
||
60 | query = dict([(k, v) for k, v in kw.items() if k in indexes]) |
||
61 | |||
62 | query["portal_type"] = "Analysis" |
||
63 | query["getAncestorsUIDs"] = api.get_uid(self.context) |
||
64 | query["sort_on"] = kw.get("sort_on", "sortable_title") |
||
65 | query["sort_order"] = kw.get("sort_order", "ascending") |
||
66 | |||
67 | # Do the search against the catalog |
||
68 | brains = catalog(query) |
||
69 | if kw.get("full_objects", False): |
||
70 | return map(api.get_object, brains) |
||
71 | return list(brains) |
||
72 | |||
73 | def set(self, items, prices, specs, hidden, **kw): |
||
74 | """Set/Assign Analyses to this AR |
||
75 | |||
76 | :param items: List of Analysis objects/brains, AnalysisService |
||
77 | objects/brains and/or Analysis Service uids |
||
78 | :type items: list |
||
79 | :param prices: Mapping of AnalysisService UID -> price |
||
80 | :type prices: dict |
||
81 | :param specs: List of AnalysisService UID -> Result Range mappings |
||
82 | :type specs: list |
||
83 | :param hidden: List of AnalysisService UID -> Hidden mappings |
||
84 | :type hidden: list |
||
85 | :returns: list of new assigned Analyses |
||
86 | """ |
||
87 | |||
88 | if items is None: |
||
89 | items = [] |
||
90 | |||
91 | # Bail out if the items is not a list type |
||
92 | if not isinstance(items, (list, tuple)): |
||
93 | raise TypeError( |
||
94 | "Items parameter must be a tuple or list, got '{}'".format( |
||
95 | type(items))) |
||
96 | |||
97 | # Bail out if the AR is inactive |
||
98 | if not api.is_active(self.context): |
||
99 | raise Unauthorized("Inactive ARs can not be modified") |
||
100 | |||
101 | # Bail out if the user has not the right permission |
||
102 | if not check_permission(AddAnalysis, self.context): |
||
103 | raise Unauthorized("You do not have the '{}' permission" |
||
104 | .format(AddAnalysis)) |
||
105 | |||
106 | # Convert the items to a valid list of AnalysisServices |
||
107 | services = filter(None, map(self._to_service, items)) |
||
108 | |||
109 | # Calculate dependencies |
||
110 | dependencies = map(lambda s: s.getServiceDependencies(), services) |
||
111 | dependencies = list(itertools.chain.from_iterable(dependencies)) |
||
112 | |||
113 | # Merge dependencies and services |
||
114 | services = set(services + dependencies) |
||
115 | |||
116 | # Modify existing AR specs with new form values of selected analyses |
||
117 | specs = self.resolve_specs(self.context, specs) |
||
118 | |||
119 | # Add analyses |
||
120 | params = dict(prices=prices, hidden=hidden, specs=specs) |
||
121 | map(lambda s: self.add_analysis(self.context, s, **params), services) |
||
122 | |||
123 | # Get all analyses (those from descendants included) |
||
124 | analyses = self.context.objectValues("Analysis") |
||
125 | analyses.extend(self.get_analyses_from_descendants(self.context)) |
||
126 | |||
127 | uids = map(api.get_uid, services) |
||
128 | to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses) |
||
129 | # Retain submitted analyses |
||
130 | to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove) |
||
131 | # Retain analyses from detached states |
||
132 | to_remove = filter(lambda an: api.get_review_status(an) |
||
133 | not in DETACHED_STATES, to_remove) |
||
134 | |||
135 | # Remove analyses |
||
136 | map(self.remove_analysis, to_remove) |
||
137 | |||
138 | def resolve_specs(self, instance, results_ranges): |
||
139 | """Returns a dictionary where the key is the service_uid and the value |
||
140 | is its results range. The dictionary is made by extending the |
||
141 | results_ranges passed-in with the Sample's ResultsRanges (a copy of the |
||
142 | specifications initially set) |
||
143 | """ |
||
144 | rrs = results_ranges or [] |
||
145 | |||
146 | # Sample's Results ranges |
||
147 | sample_rrs = instance.getResultsRange() |
||
148 | |||
149 | # Ensure all subfields from specification are kept and missing values |
||
150 | # for subfields are filled in accordance with the specs |
||
151 | rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs) |
||
152 | |||
153 | # Append those from sample that are missing in the ranges passed-in |
||
154 | service_uids = map(lambda rr: rr["uid"], rrs) |
||
155 | rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs)) |
||
156 | |||
157 | # Create a dict for easy access to results ranges |
||
158 | return dict(map(lambda rr: (rr["uid"], rr), rrs)) |
||
159 | |||
160 | def resolve_range(self, result_range, sample_result_ranges): |
||
161 | """Resolves the range by adding the uid if not present and filling the |
||
162 | missing subfield values with those that come from the Sample |
||
163 | specification if they are not present in the result_range passed-in |
||
164 | """ |
||
165 | # Resolve result_range to make sure it contain uid subfield |
||
166 | rrs = self.resolve_uid(result_range) |
||
167 | uid = rrs.get("uid") |
||
168 | |||
169 | for sample_rr in sample_result_ranges: |
||
170 | if uid and sample_rr.get("uid") == uid: |
||
171 | # Keep same fields from sample |
||
172 | rr = sample_rr.copy() |
||
173 | rr.update(rrs) |
||
174 | return rr |
||
175 | |||
176 | # Return the original with no changes |
||
177 | return rrs |
||
178 | |||
179 | View Code Duplication | def resolve_uid(self, result_range): |
|
|
|||
180 | """Resolves the uid key for the result_range passed in if it does not |
||
181 | exist when contains a keyword |
||
182 | """ |
||
183 | value = result_range.copy() |
||
184 | uid = value.get("uid") |
||
185 | if api.is_uid(uid) and uid != "0": |
||
186 | return value |
||
187 | |||
188 | # uid key does not exist or is not valid, try to infere from keyword |
||
189 | keyword = value.get("keyword") |
||
190 | if keyword: |
||
191 | query = dict(portal_type="AnalysisService", getKeyword=keyword) |
||
192 | brains = api.search(query, SETUP_CATALOG) |
||
193 | if len(brains) == 1: |
||
194 | uid = api.get_uid(brains[0]) |
||
195 | value["uid"] = uid |
||
196 | return value |
||
197 | |||
198 | def resolve_conditions(self, analysis): |
||
199 | """Returns the conditions to be applied to this analysis by merging |
||
200 | those already set at sample level with defaults |
||
201 | """ |
||
202 | service = analysis.getAnalysisService() |
||
203 | default_conditions = service.getConditions() |
||
204 | |||
205 | # Extract the conditions set for this analysis already |
||
206 | existing = analysis.getConditions() |
||
207 | existing_titles = [cond.get("title") for cond in existing] |
||
208 | |||
209 | def is_missing(condition): |
||
210 | return condition.get("title") not in existing_titles |
||
211 | |||
212 | # Add only those conditions that are missing |
||
213 | missing = filter(is_missing, default_conditions) |
||
214 | |||
215 | # Sort them to match with same order as in service |
||
216 | titles = [condition.get("title") for condition in default_conditions] |
||
217 | |||
218 | def index(condition): |
||
219 | cond_title = condition.get("title") |
||
220 | if cond_title in titles: |
||
221 | return titles.index(cond_title) |
||
222 | return len(titles) |
||
223 | |||
224 | conditions = existing + missing |
||
225 | return sorted(conditions, key=lambda con: index(con)) |
||
226 | |||
227 | def add_analysis(self, instance, service, **kwargs): |
||
228 | service_uid = api.get_uid(service) |
||
229 | |||
230 | # Ensure we have suitable parameters |
||
231 | specs = kwargs.get("specs") or {} |
||
232 | |||
233 | # Get the hidden status for the service |
||
234 | hidden = kwargs.get("hidden") or [] |
||
235 | hidden = filter(lambda d: d.get("uid") == service_uid, hidden) |
||
236 | hidden = hidden and hidden[0].get("hidden") or service.getHidden() |
||
237 | |||
238 | # Get the price for the service |
||
239 | prices = kwargs.get("prices") or {} |
||
240 | price = prices.get(service_uid) or service.getPrice() |
||
241 | |||
242 | # Get the default result for the service |
||
243 | default_result = service.getDefaultResult() |
||
244 | |||
245 | # Gets the analysis or creates the analysis for this service |
||
246 | # Note this returns a list, because is possible to have multiple |
||
247 | # partitions with same analysis |
||
248 | analyses = self.resolve_analyses(instance, service) |
||
249 | |||
250 | # Filter out analyses in detached states |
||
251 | # This allows to re-add an analysis that was retracted or cancelled |
||
252 | analyses = filter( |
||
253 | lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES, |
||
254 | analyses) |
||
255 | |||
256 | if not analyses: |
||
257 | # Create the analysis |
||
258 | analysis = create_analysis(instance, service) |
||
259 | analyses.append(analysis) |
||
260 | |||
261 | for analysis in analyses: |
||
262 | # Set the hidden status |
||
263 | analysis.setHidden(hidden) |
||
264 | |||
265 | # Set the price of the Analysis |
||
266 | analysis.setPrice(price) |
||
267 | |||
268 | # Set the internal use status |
||
269 | parent_sample = analysis.getRequest() |
||
270 | analysis.setInternalUse(parent_sample.getInternalUse()) |
||
271 | |||
272 | # Set the default result to the analysis |
||
273 | if not analysis.getResult() and default_result: |
||
274 | analysis.setResult(default_result) |
||
275 | analysis.setResultCaptureDate(None) |
||
276 | |||
277 | # Set the result range to the analysis |
||
278 | analysis_rr = specs.get(service_uid) or analysis.getResultsRange() |
||
279 | analysis.setResultsRange(analysis_rr) |
||
280 | |||
281 | # Set default (pre)conditions |
||
282 | conditions = self.resolve_conditions(analysis) |
||
283 | analysis.setConditions(conditions) |
||
284 | |||
285 | analysis.reindexObject() |
||
286 | |||
287 | def remove_analysis(self, analysis): |
||
288 | """Removes a given analysis from the instance |
||
289 | """ |
||
290 | # Remember assigned attachments |
||
291 | # https://github.com/senaite/senaite.core/issues/1025 |
||
292 | attachments = analysis.getAttachment() |
||
293 | analysis.setAttachment([]) |
||
294 | |||
295 | # If assigned to a worksheet, unassign it before deletion |
||
296 | worksheet = analysis.getWorksheet() |
||
297 | if worksheet: |
||
298 | worksheet.removeAnalysis(analysis) |
||
299 | |||
300 | # handle retest source deleted |
||
301 | retest = analysis.getRetest() |
||
302 | if retest: |
||
303 | # unset reference link |
||
304 | retest.setRetestOf(None) |
||
305 | |||
306 | # Remove the analysis |
||
307 | # Note the analysis might belong to a partition |
||
308 | analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)]) |
||
309 | |||
310 | # Remove orphaned attachments |
||
311 | for attachment in attachments: |
||
312 | if not attachment.getLinkedAnalyses(): |
||
313 | # only delete attachments which are no further linked |
||
314 | logger.info( |
||
315 | "Deleting attachment: {}".format(attachment.getId())) |
||
316 | attachment_id = api.get_id(attachment) |
||
317 | api.get_parent(attachment).manage_delObjects(attachment_id) |
||
318 | |||
319 | def resolve_analyses(self, instance, service): |
||
320 | """Resolves analyses for the service and instance |
||
321 | It returns a list, cause for a given sample, multiple analyses for same |
||
322 | service can exist due to the possibility of having multiple partitions |
||
323 | """ |
||
324 | analyses = [] |
||
325 | |||
326 | # Does the analysis exists in this instance already? |
||
327 | instance_analyses = self.get_from_instance(instance, service) |
||
328 | |||
329 | if instance_analyses: |
||
330 | analyses.extend(instance_analyses) |
||
331 | |||
332 | # Does the analysis exists in an ancestor? |
||
333 | from_ancestor = self.get_from_ancestor(instance, service) |
||
334 | for ancestor_analysis in from_ancestor: |
||
335 | # only move non-assigned analyses |
||
336 | state = api.get_workflow_status_of(ancestor_analysis) |
||
337 | if state != "unassigned": |
||
338 | continue |
||
339 | # Move the analysis into the partition |
||
340 | analysis_id = api.get_id(ancestor_analysis) |
||
341 | logger.info("Analysis {} is from an ancestor".format(analysis_id)) |
||
342 | cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id) |
||
343 | instance.manage_pasteObjects(cp) |
||
344 | analyses.append(instance._getOb(analysis_id)) |
||
345 | |||
346 | # Does the analysis exists in descendants? |
||
347 | from_descendant = self.get_from_descendant(instance, service) |
||
348 | analyses.extend(from_descendant) |
||
349 | |||
350 | return analyses |
||
351 | |||
352 | def get_analyses_from_descendants(self, instance): |
||
353 | """Returns all the analyses from descendants |
||
354 | """ |
||
355 | analyses = [] |
||
356 | for descendant in instance.getDescendants(all_descendants=True): |
||
357 | analyses.extend(descendant.objectValues("Analysis")) |
||
358 | return analyses |
||
359 | |||
360 | def get_from_instance(self, instance, service): |
||
361 | """Returns analyses for the given service from the instance |
||
362 | """ |
||
363 | service_uid = api.get_uid(service) |
||
364 | analyses = instance.objectValues("Analysis") |
||
365 | # Filter those analyses with same keyword. Note that a Sample can |
||
366 | # contain more than one analysis with same keyword because of retests |
||
367 | return filter(lambda an: an.getServiceUID() == service_uid, analyses) |
||
368 | |||
369 | def get_from_ancestor(self, instance, service): |
||
370 | """Returns analyses for the given service from ancestors |
||
371 | """ |
||
372 | ancestor = instance.getParentAnalysisRequest() |
||
373 | if not ancestor: |
||
374 | return [] |
||
375 | |||
376 | analyses = self.get_from_instance(ancestor, service) |
||
377 | return analyses or self.get_from_ancestor(ancestor, service) |
||
378 | |||
379 | def get_from_descendant(self, instance, service): |
||
380 | """Returns analyses for the given service from descendants |
||
381 | """ |
||
382 | analyses = [] |
||
383 | for descendant in instance.getDescendants(): |
||
384 | # Does the analysis exists in the current descendant? |
||
385 | descendant_analyses = self.get_from_instance(descendant, service) |
||
386 | if descendant_analyses: |
||
387 | analyses.extend(descendant_analyses) |
||
388 | |||
389 | # Search in descendants from current descendant |
||
390 | from_descendant = self.get_from_descendant(descendant, service) |
||
391 | analyses.extend(from_descendant) |
||
392 | |||
393 | return analyses |
||
394 | |||
395 | def _to_service(self, thing): |
||
396 | """Convert to Analysis Service |
||
397 | |||
398 | :param thing: UID/Catalog Brain/Object/Something |
||
399 | :returns: Analysis Service object or None |
||
400 | """ |
||
401 | |||
402 | # Convert UIDs to objects |
||
403 | if api.is_uid(thing): |
||
404 | thing = api.get_object_by_uid(thing, None) |
||
405 | |||
406 | # Bail out if the thing is not a valid object |
||
407 | if not api.is_object(thing): |
||
408 | logger.warn("'{}' is not a valid object!".format(repr(thing))) |
||
409 | return None |
||
410 | |||
411 | # Ensure we have an object here and not a brain |
||
412 | obj = api.get_object(thing) |
||
413 | |||
414 | if IAnalysisService.providedBy(obj): |
||
415 | return obj |
||
416 | |||
417 | if IAnalysis.providedBy(obj): |
||
418 | return obj.getAnalysisService() |
||
419 | |||
420 | # An object, but neither an Analysis nor AnalysisService? |
||
421 | # This should never happen. |
||
422 | portal_type = api.get_portal_type(obj) |
||
423 | logger.error("ARAnalysesField doesn't accept objects from {} type. " |
||
424 | "The object will be dismissed.".format(portal_type)) |
||
425 | return None |
||
426 |