Passed
Push — 2.x ( 0d0b6a...d19b71 )
by Jordi
09:33 queued 02:25
created

bika.lims.api.analysis.get_formatted_interval()   F

Complexity

Conditions 14

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 45
rs 3.6
c 0
b 0
f 0
cc 14
nop 2

How to fix   Complexity   

Complexity

Complex classes like bika.lims.api.analysis.get_formatted_interval() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import cgi
22
from collections import Mapping
23
24
from bika.lims import api
25
from bika.lims.config import MAX_OPERATORS
26
from bika.lims.config import MIN_OPERATORS
27
from bika.lims.content.analysisspec import ResultsRangeDict
28
from bika.lims.interfaces import IAnalysis
29
from bika.lims.interfaces import IDuplicateAnalysis
30
from bika.lims.interfaces import IReferenceAnalysis
31
from bika.lims.interfaces import IRejected
32
from bika.lims.interfaces import IResultOutOfRange
33
from bika.lims.interfaces import IRetracted
34
from bika.lims.interfaces.analysis import IRequestAnalysis
35
from zope.component._api import getAdapters
36
37
_marker = object()
38
39
40
def is_out_of_range(brain_or_object, result=_marker):
41
    """Checks if the result for the analysis passed in is out of range and/or
42
    out of shoulders range.
43
44
            min                                                   max
45
            warn            min                   max             warn
46
    ·········|---------------|=====================|---------------|·········
47
    ----- out-of-range -----><----- in-range ------><----- out-of-range -----
48
             <-- shoulder --><----- in-range ------><-- shoulder -->
49
50
    :param brain_or_object: A single catalog brain or content object
51
    :param result: Tentative result. If None, use the analysis result
52
    :type brain_or_object: ATContentType/DexterityContentType/CatalogBrain
53
    :returns: Tuple of two elements. The first value is `True` if the result is
54
    out of range and `False` if it is in range. The second value is `True` if
55
    the result is out of shoulder range and `False` if it is in shoulder range
56
    :rtype: (bool, bool)
57
    """
58
    analysis = api.get_object(brain_or_object)
59
    if not IAnalysis.providedBy(analysis) and \
60
            not IReferenceAnalysis.providedBy(analysis):
61
        api.fail("{} is not supported. Needs to be IAnalysis or "
62
                 "IReferenceAnalysis".format(repr(analysis)))
63
64
    if result is _marker:
65
        result = api.safe_getattr(analysis, "getResult", None)
66
67
    if result in [None, '']:
68
        # Empty result
69
        return False, False
70
71
    if IDuplicateAnalysis.providedBy(analysis):
72
        # Result range for duplicate analyses is calculated from the original
73
        # result, applying a variation % in shoulders. If the analysis has
74
        # result options enabled or string results enabled, system returns an
75
        # empty result range for the duplicate: result must match %100 with the
76
        # original result
77
        original = analysis.getAnalysis()
78
        original_result = original.getResult()
79
80
        # Does original analysis have a valid result?
81
        if original_result in [None, '']:
82
            return False, False
83
84
        # Does original result type matches with duplicate result type?
85
        if api.is_floatable(result) != api.is_floatable(original_result):
86
            return True, True
87
88
        # Does analysis has result options enabled or non-floatable?
89
        if analysis.getResultOptions() or not api.is_floatable(original_result):
90
            # Let's always assume the result is 'out from shoulders', cause we
91
            # consider the shoulders are precisely the duplicate variation %
92
            out_of_range = original_result != result
93
            return out_of_range, out_of_range
94
95
    elif not api.is_floatable(result):
96
        results = api.parse_json(result)
97
        if not results:
98
            # Single, non-duplicate, non-floatable result. There is no chance
99
            # to know if the result is out-of-range
100
            return False, False
101
102
        # Multiselect result, remove empty and non-floatable 'sub' results
103
        results = filter(api.is_floatable, results)
104
        if not results:
105
            # No values set yet, we cannot know if out-of-range yet
106
            return False, False
107
108
        # Out of range only when none of the 'sub' results are within range
109
        for sub_result in results:
110
            out_range, out_shoulders = is_out_of_range(analysis, sub_result)
111
            if not out_range:
112
                # sub result within range
113
                return False, False
114
115
        # None of the 'sub' results are within range
116
        return True, True
117
118
    # Convert result to a float
119
    result = api.to_float(result)
120
121
    # Note that routine analyses, duplicates and reference analyses all them
122
    # implement the function getResultRange:
123
    # - For routine analyses, the function returns the valid range based on the
124
    #   specs assigned during the creation process.
125
    # - For duplicates, the valid range is the result of the analysis the
126
    #   the duplicate was generated from +/- the duplicate variation.
127
    # - For reference analyses, getResultRange returns the valid range as
128
    #   indicated in the Reference Sample from which the analysis was created.
129
    result_range = api.safe_getattr(analysis, "getResultsRange", None)
130
    if not result_range:
131
        # No result range defined or the passed in object does not suit
132
        return False, False
133
134
    # Maybe there is a custom adapter
135
    adapters = getAdapters((analysis,), IResultOutOfRange)
136
    for name, adapter in adapters:
137
        ret = adapter(result=result, specification=result_range)
138
        if not ret or not ret.get('out_of_range', False):
139
            continue
140
        if not ret.get('acceptable', True):
141
            # Out of range + out of shoulders
142
            return True, True
143
        # Out of range, but in shoulders
144
        return True, False
145
146
    result_range = ResultsRangeDict(result_range)
147
148
    # The assignment of result as default fallback for min and max guarantees
149
    # the result will be in range also if no min/max values are defined
150
    specs_min = api.to_float(result_range.min, result)
151
    specs_max = api.to_float(result_range.max, result)
152
153
    in_range = False
154
    min_operator = result_range.min_operator
155
    if min_operator == "geq":
156
        in_range = result >= specs_min
157
    else:
158
        in_range = result > specs_min
159
160
    max_operator = result_range.max_operator
161
    if in_range:
162
        if max_operator == "leq":
163
            in_range = result <= specs_max
164
        else:
165
            in_range = result < specs_max
166
167
    # If in range, no need to check shoulders
168
    if in_range:
169
        return False, False
170
171
    # Out of range, check shoulders. If no explicit warn_min or warn_max have
172
    # been defined, no shoulders must be considered for this analysis. Thus, use
173
    # specs' min and max as default fallback values
174
    warn_min = api.to_float(result_range.warn_min, specs_min)
175
    warn_max = api.to_float(result_range.warn_max, specs_max)
176
    in_shoulder = warn_min <= result <= warn_max
177
    return True, not in_shoulder
178
179
180
def get_formatted_interval(analysis_or_results_range, default=_marker):
181
    """Returns a string representation of the interval defined by the results
182
    range passed in
183
184
    :param analysis_or_results_range: analysis, dict or ResultsRangeDict
185
    """
186
    analysis = None
187
    if IAnalysis.providedBy(analysis_or_results_range):
188
        analysis = analysis_or_results_range
189
        results_range = analysis.getResultsRange()
190
    else:
191
        results_range = analysis_or_results_range
192
193
    if not isinstance(results_range, Mapping):
194
        if default is not _marker:
195
            return default
196
        api.fail("Type not supported")
197
198
    results_range = ResultsRangeDict(results_range)
199
    min_str = results_range.min if api.is_floatable(results_range.min) else None
200
    max_str = results_range.max if api.is_floatable(results_range.max) else None
201
202
    if analysis:
203
        min_text = analysis.getResultOptionTextByValue(min_str)
204
        min_str = cgi.escape(min_text) if min_text else None
205
        max_text = analysis.getResultOptionTextByValue(max_str)
206
        max_str = cgi.escape(max_text) if max_text else None
207
208
    if min_str is None and max_str is None:
209
        if default is not _marker:
210
            return default
211
        api.fail("Min and max values are not floatable or not defined")
212
213
    min_operator = results_range.min_operator
214
    max_operator = results_range.max_operator
215
    if max_str is None:
216
        return "{}{}".format(MIN_OPERATORS.getValue(min_operator), min_str)
217
    if min_str is None:
218
        return "{}{}".format(MAX_OPERATORS.getValue(max_operator), max_str)
219
220
    # Both values set. Return an interval
221
    min_bracket = min_operator == 'geq' and '[' or '('
222
    max_bracket = max_operator == 'leq' and ']' or ')'
223
224
    return "{}{};{}{}".format(min_bracket, min_str, max_str, max_bracket)
225
226
227
def is_result_range_compliant(analysis):
228
    """Returns whether the result range from the analysis matches with the
229
    result range for the service counterpart defined in the Sample
230
    """
231
    if not IRequestAnalysis.providedBy(analysis):
232
        return True
233
234
    if IDuplicateAnalysis.providedBy(analysis):
235
        # Does not make sense to apply compliance to a duplicate, cause its
236
        # valid range depends on the result of the original analysis
237
        return True
238
239
    rr = analysis.getResultsRange()
240
    service_uid = rr.get("uid", None)
241
    if not api.is_uid(service_uid):
242
        return True
243
244
    # Compare with Sample
245
    sample = analysis.getRequest()
246
247
    # If no Specification is set, assume is compliant
248
    specification = sample.getRawSpecification()
249
    if not specification:
250
        return True
251
252
    # Compare with the Specification that was initially set to the Sample
253
    sample_rr = sample.getResultsRange(search_by=service_uid)
254
    if not sample_rr:
255
        # This service is not defined in Sample's ResultsRange, we
256
        # assume this *does not* break the compliance
257
        return True
258
259
    return rr == sample_rr
260
261
262
def is_analysis(brain_or_object):
263
    """Checks if the object is an analysis
264
265
    :param brain_or_object: A single catalog brain or content object
266
    :returns: True if the object is an analysis, False otherwise
267
    """
268
    analysis = api.get_object(brain_or_object)
269
    return IAnalysis.providedBy(analysis)
270
271
272
def is_reference_analysis(brain_or_object):
273
    """Checks if the object is a reference analysis
274
275
    :param brain_or_object: A single catalog brain or content object
276
    :returns: True if the object is a reference analysis, False otherwise
277
    """
278
    analysis = api.get_object(brain_or_object)
279
    return IReferenceAnalysis.providedBy(analysis)
280
281
282
def is_retracted(brain_or_object):
283
    """Checks if an analysis is retracted
284
285
    :param brain_or_object: A single catalog brain or content object
286
    :returns: True if the analysis is retracted, False otherwise
287
    """
288
    analysis = api.get_object(brain_or_object)
289
    if not is_analysis(analysis) and not is_reference_analysis(analysis):
290
        api.fail("{} is not supported. Needs to be IAnalysis or "
291
                 "IReferenceAnalysis".format(repr(analysis)))
292
    return IRetracted.providedBy(analysis)
293
294
295
def is_rejected(brain_or_object):
296
    """Checks if the analysis is rejected
297
298
    :param brain_or_object: A single catalog brain or content object
299
    :returns: True if the analysis is rejected, False otherwise
300
    """
301
    analysis = api.get_object(brain_or_object)
302
    if not is_analysis(analysis) and not is_reference_analysis(analysis):
303
        api.fail("{} is not supported. Needs to be IAnalysis or "
304
                 "IReferenceAnalysis".format(repr(analysis)))
305
    return IRejected.providedBy(analysis)
306
307
308
def is_retested(brain_or_object):
309
    """Checks if the analysis is retested
310
311
    :param brain_or_object: A single catalog brain or content object
312
    :returns: True if the analysis is retested, False otherwise
313
    """
314
    analysis = api.get_object(brain_or_object)
315
    if not is_analysis(analysis) and not is_reference_analysis(analysis):
316
        api.fail("{} is not supported. Needs to be IAnalysis or "
317
                 "IReferenceAnalysis".format(repr(analysis)))
318
    return analysis.isRetested()
319
320
321
def get_dependencies(brain_or_object, with_retests=False, recursive=False):
322
    """Returns the list of dependent analysis UIDs for the analysis passed in
323
324
    :param brain_or_object: A single catalog brain or content object
325
    :returns: List analysis objects that this analysis depends on
326
    """
327
    if not is_analysis(brain_or_object):
328
        return []
329
330
    dependencies = set()
331
    analysis = api.get_object(brain_or_object)
332
333
    # no calculation, no dependencies
334
    calc = analysis.getCalculation()
335
    if not calc:
336
        return []
337
338
    # get the sample (might be a partition)
339
    sample = analysis.getRequest()
340
    # get calculation dependencies
341
    service_deps = calc.getDependentServices()
342
    # get the keywords of the dependent services
343
    keywords = [s.getKeyword() for s in service_deps]
344
    # no dependencies to other services, nothing to do
345
    if not keywords:
346
        return []
347
    # collect the analyses
348
    dependencies.update(sample.getAnalyses(getKeyword=keywords))
349
350
    # calculate all dependencies for our dependencies
351
    if recursive:
352
        # iterate over all dependencies and get their dependencies
353
        for dep in list(dependencies):
354
            dependencies.update(get_dependencies(
355
                dep, with_retests=with_retests, recursive=recursive))
356
357
    if not with_retests:
358
        # filter out retracted, rejected and retested analyses
359
        def is_retest(analysis):
360
            return is_retracted(analysis) or is_rejected(analysis) \
361
                or is_retested(analysis)
362
        dependencies = filter(lambda d: not is_retest(d), dependencies)
0 ignored issues
show
introduced by
The variable is_retest does not seem to be defined in case BooleanNotNode on line 357 is False. Are you sure this can never be the case?
Loading history...
363
364
    return map(api.get_object, dependencies)
365
366
367
def get_dependents(brain_or_object, with_retests=False, recursive=False):
368
    """Returns the list of analysis UIDs that depend on the current
369
370
    :param brain_or_object: A single catalog brain or content object
371
    :returns: List of analysis object that depend on the current analysis
372
    """
373
    if not is_analysis(brain_or_object):
374
        return []
375
376
    dependents = set()
377
    analysis = api.get_object(brain_or_object)
378
379
    # get the service of the current analysis
380
    service = analysis.getAnalysisService()
381
382
    # get the sample (might be a partition)
383
    sample = analysis.getRequest()
384
385
    # get all analyses with calculations
386
    analyses_with_calcs = sample.getAnalyses(
387
        has_calculation=True, full_objects=True)
388
389
    # Now we check if we are part of any calculation
390
    for analysis in analyses_with_calcs:
391
        calc = analysis.getCalculation()
392
        if not calc:
393
            # in case the `has_calculation` index is not there yet
394
            continue
395
        dependencies = calc.getDependentServices()
396
        # check if our service is a dependency
397
        if service in dependencies:
398
            # remember the analysis that depends on us
399
            dependents.add(analysis)
400
401
    if recursive:
402
        for dep in list(dependents):
403
            dependents.update(get_dependents(
404
                dep, with_retests=with_retests, recursive=recursive))
405
406
    if not with_retests:
407
        # filter out retracted, rejected and retested analyses
408
        def is_retest(analysis):
409
            return is_retracted(analysis) or is_rejected(analysis) \
410
                or is_retested(analysis)
411
        dependents = filter(lambda d: not is_retest(d), dependents)
0 ignored issues
show
introduced by
The variable is_retest does not seem to be defined for all execution paths.
Loading history...
412
413
    return map(api.get_object, dependents)
414