Passed
Push — 2.x ( 4ac573...e592c3 )
by Ramon
07:11
created

AbstractAnalysis.isBelowLimitOfQuantification()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import cgi
22
import copy
23
import json
24
import math
25
from decimal import Decimal
26
27
from AccessControl import ClassSecurityInfo
28
from bika.lims import api
29
from bika.lims import bikaMessageFactory as _
30
from bika.lims import deprecated
31
from bika.lims import logger
32
from bika.lims.browser.fields import HistoryAwareReferenceField
33
from bika.lims.browser.fields import InterimFieldsField
34
from bika.lims.browser.fields import ResultRangeField
35
from bika.lims.browser.fields import UIDReferenceField
36
from bika.lims.browser.fields.uidreferencefield import get_backreferences
37
from bika.lims.browser.widgets import RecordsWidget
38
from bika.lims.config import LDL
39
from bika.lims.config import UDL
40
from bika.lims.content.abstractbaseanalysis import AbstractBaseAnalysis
41
from bika.lims.content.abstractbaseanalysis import schema
42
from bika.lims.interfaces import IDuplicateAnalysis
43
from bika.lims.utils import formatDecimalMark
44
from bika.lims.utils.analysis import format_numeric_result
45
from bika.lims.utils.analysis import get_significant_digits
46
from bika.lims.workflow import getTransitionActor
47
from bika.lims.workflow import getTransitionDate
48
from DateTime import DateTime
49
from Products.Archetypes.Field import IntegerField
50
from Products.Archetypes.Field import StringField
51
from Products.Archetypes.references import HoldingReference
52
from Products.Archetypes.Schema import Schema
53
from Products.CMFCore.permissions import View
54
from senaite.core.browser.fields.datetime import DateTimeField
55
from senaite.core.i18n import translate as t
56
from senaite.core.permissions import FieldEditAnalysisResult
57
from senaite.core.permissions import ViewResults
58
from six import string_types
59
60
# A link directly to the AnalysisService object used to create the analysis
61
AnalysisService = UIDReferenceField(
62
    'AnalysisService'
63
)
64
65
# Attachments which are added manually in the UI, or automatically when
66
# results are imported from a file supplied by an instrument.
67
Attachment = UIDReferenceField(
68
    'Attachment',
69
    multiValued=1,
70
    allowed_types=('Attachment',),
71
    relationship='AnalysisAttachment'
72
)
73
74
# The final result of the analysis is stored here
75
Result = StringField(
76
    'Result',
77
    read_permission=ViewResults,
78
    write_permission=FieldEditAnalysisResult,
79
)
80
81
# When the result is changed, this value is updated to the current time.
82
# Only the most recent result capture date is recorded here and used to
83
# populate catalog values, however the workflow review_history can be
84
# used to get all dates of result capture
85
ResultCaptureDate = DateTimeField(
86
    'ResultCaptureDate',
87
    read_permission=View,
88
    write_permission=FieldEditAnalysisResult,
89
    max="current",
90
)
91
92
# Returns the retracted analysis this analysis is a retest of
93
RetestOf = UIDReferenceField(
94
    'RetestOf',
95
    relationship="AnalysisRetestOf",
96
)
97
98
# If the result is outside of the detection limits of the method or instrument,
99
# the operand (< or >) is stored here.  For routine analyses this is taken
100
# from the Result, if the result entered explicitly startswith "<" or ">"
101
DetectionLimitOperand = StringField(
102
    'DetectionLimitOperand',
103
    read_permission=View,
104
    write_permission=FieldEditAnalysisResult,
105
)
106
107
# The ID of the logged in user who submitted the result for this Analysis.
108
Analyst = StringField(
109
    'Analyst'
110
)
111
112
# The actual uncertainty for this analysis' result, populated from the ranges
113
# specified in the analysis service when the result is submitted.
114
Uncertainty = StringField(
115
    "Uncertainty",
116
    read_permission=View,
117
    write_permission=FieldEditAnalysisResult,
118
    precision=10,
119
)
120
121
# transitioned to a 'verified' state. This value is set automatically
122
# when the analysis is created, based on the value set for the property
123
# NumberOfRequiredVerifications from the Analysis Service
124
NumberOfRequiredVerifications = IntegerField(
125
    'NumberOfRequiredVerifications',
126
    default=1
127
)
128
129
# Routine Analyses and Reference Analysis have a versioned link to
130
# the calculation at creation time.
131
Calculation = HistoryAwareReferenceField(
132
    'Calculation',
133
    read_permission=View,
134
    write_permission=FieldEditAnalysisResult,
135
    allowed_types=('Calculation',),
136
    relationship='AnalysisCalculation',
137
    referenceClass=HoldingReference
138
)
139
140
# InterimFields are defined in Calculations, Services, and Analyses.
141
# In Analysis Services, the default values are taken from Calculation.
142
# In Analyses, the default values are taken from the Analysis Service.
143
# When instrument results are imported, the values in analysis are overridden
144
# before the calculation is performed.
145
InterimFields = InterimFieldsField(
146
    'InterimFields',
147
    read_permission=View,
148
    write_permission=FieldEditAnalysisResult,
149
    schemata='Method',
150
    widget=RecordsWidget(
151
        label=_("Calculation Interim Fields"),
152
        description=_(
153
            "Values can be entered here which will override the defaults "
154
            "specified in the Calculation Interim Fields."),
155
    )
156
)
157
158
# Results Range that applies to this analysis
159
ResultsRange = ResultRangeField(
160
    "ResultsRange",
161
    required=0
162
)
163
164
schema = schema.copy() + Schema((
165
    AnalysisService,
166
    Analyst,
167
    Attachment,
168
    DetectionLimitOperand,
169
    # NumberOfRequiredVerifications overrides AbstractBaseClass
170
    NumberOfRequiredVerifications,
171
    Result,
172
    ResultCaptureDate,
173
    RetestOf,
174
    Uncertainty,
175
    Calculation,
176
    InterimFields,
177
    ResultsRange,
178
))
179
180
181
class AbstractAnalysis(AbstractBaseAnalysis):
182
    security = ClassSecurityInfo()
183
    displayContentsTab = False
184
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
185
186
    @deprecated('[1705] Currently returns the Analysis object itself.  If you '
187
                'need to get the service, use getAnalysisService instead')
188
    @security.public
189
    def getService(self):
190
        return self
191
192
    def getServiceUID(self):
193
        """Return the UID of the associated service.
194
        """
195
        return self.getRawAnalysisService()
196
197
    @security.public
198
    def getNumberOfVerifications(self):
199
        return len(self.getVerificators())
200
201
    @security.public
202
    def getNumberOfRemainingVerifications(self):
203
        required = self.getNumberOfRequiredVerifications()
204
        done = self.getNumberOfVerifications()
205
        if done >= required:
206
            return 0
207
        return required - done
208
209
    # TODO Workflow - analysis . Remove?
210
    @security.public
211
    def getLastVerificator(self):
212
        verifiers = self.getVerificators()
213
        return verifiers and verifiers[-1] or None
214
215
    @security.public
216
    def getVerificators(self):
217
        """Returns the user ids of the users that verified this analysis
218
        """
219
        verifiers = list()
220
        actions = ["retest", "verify", "multi_verify"]
221
        for event in api.get_review_history(self, rev=False):
222
            if event.get("review_state") == "verified":
223
                # include all transitions their end state is 'verified'
224
                verifiers.append(event["actor"])
225
            elif event.get("action") in actions:
226
                # include some transitions their end state is not 'verified'
227
                verifiers.append(event["actor"])
228
        return verifiers
229
230
    @security.public
231
    def getDefaultUncertainty(self):
232
        """Return the uncertainty value, if the result falls within
233
        specified ranges for the service from which this analysis was derived.
234
        """
235
        result = self.getResult()
236
        if not api.is_floatable(result):
237
            return None
238
239
        uncertainties = self.getUncertainties()
240
        if not uncertainties:
241
            return None
242
243
        result = api.to_float(result)
244
        for record in uncertainties:
245
246
            # convert to min/max
247
            unc_min = api.to_float(record["intercept_min"], default=0)
248
            unc_max = api.to_float(record["intercept_max"], default=0)
249
250
            if unc_min <= result <= unc_max:
251
                # result is within the range defined for this uncertainty
252
                uncertainty = str(record["errorvalue"]).strip()
253
                if uncertainty.endswith("%"):
254
                    # uncertainty expressed as a percentage of the result
255
                    try:
256
                        percentage = float(uncertainty.replace("%", ""))
257
                        uncertainty = result / 100 * percentage
258
                    except ValueError:
259
                        return None
260
                else:
261
                    uncertainty = api.to_float(uncertainty, default=0)
262
263
                # convert back to string value
264
                return api.float_to_string(uncertainty, default=None)
265
266
        return None
267
268
    @security.public
269
    def getUncertainty(self):
270
        """Returns the uncertainty for this analysis.
271
272
        Returns the value from Schema's Uncertainty field if the Service has
273
        the option 'Allow manual uncertainty'.
274
        Otherwise, do a callback to getDefaultUncertainty().
275
276
        Returns None if no result specified and the current result for this
277
        analysis is outside of the quantifiable range.
278
        """
279
        if self.isOutsideTheQuantifiableRange():
280
            # does not make sense to display uncertainty if the result is
281
            # outside of the quantifiable because the measurement is not
282
            # reliable or accurate enough to confidently quantify the analyte
283
            return None
284
285
        uncertainty = self.getField("Uncertainty").get(self)
286
        if uncertainty and self.getAllowManualUncertainty():
287
            # the uncertainty has been manually set on results introduction
288
            return api.float_to_string(uncertainty, default=None)
289
290
        # fallback to the default uncertainty for this analysis
291
        return self.getDefaultUncertainty()
292
293
    @security.public
294
    def setUncertainty(self, unc):
295
        """Sets the uncertainty for this analysis
296
297
        If the result is a Detection Limit or the value is below LDL or upper
298
        UDL, set the uncertainty to None``
299
        """
300
        if self.isOutsideTheQuantifiableRange():
301
            unc = None
302
303
        field = self.getField("Uncertainty")
304
        field.set(self, api.float_to_string(unc, default=None))
305
306
    @security.public
307
    def setDetectionLimitOperand(self, value):
308
        """Set detection limit operand for this analysis
309
        Allowed detection limit operands are `<` and `>`.
310
        """
311
        manual_dl = self.getAllowManualDetectionLimit()
312
        selector = self.getDetectionLimitSelector()
313
        if not manual_dl and not selector:
314
            # Don't allow the user to set the limit operand if manual assignment
315
            # is not allowed and selector is not visible
316
            return
317
318
        # Changing the detection limit operand has a side effect on the result
319
        result = self.getResult()
320
        if value in [LDL, UDL]:
321
            # flush uncertainty
322
            self.setUncertainty("")
323
324
            # If no previous result or user is not allowed to manually set the
325
            # the detection limit, override the result with default LDL/UDL
326
            has_result = api.is_floatable(result)
327
            if not has_result or not manual_dl:
328
                # set the result according to the system default UDL/LDL values
329
                if value == LDL:
330
                    result = self.getLowerDetectionLimit()
331
                else:
332
                    result = self.getUpperDetectionLimit()
333
334
        else:
335
            value = ""
336
337
        # Set the result
338
        self.getField("Result").set(self, result)
339
340
        # Set the detection limit to the field
341
        self.getField("DetectionLimitOperand").set(self, value)
342
343
    # Method getLowerDetectionLimit overrides method of class BaseAnalysis
344
    @security.public
345
    def getLowerDetectionLimit(self):
346
        """Returns the Lower Detection Limit (LDL) that applies to this
347
        analysis in particular. If no value set or the analysis service
348
        doesn't allow manual input of detection limits, returns the value set
349
        by default in the Analysis Service
350
        """
351
        if self.isLowerDetectionLimit():
352
            result = self.getResult()
353
            if api.is_floatable(result):
354
                return result
355
356
            logger.warn("The result for the analysis %s is a lower detection "
357
                        "limit, but not floatable: '%s'. Returning AS's "
358
                        "default LDL." % (self.id, result))
359
        return AbstractBaseAnalysis.getLowerDetectionLimit(self)
360
361
    # Method getUpperDetectionLimit overrides method of class BaseAnalysis
362
    @security.public
363
    def getUpperDetectionLimit(self):
364
        """Returns the Upper Detection Limit (UDL) that applies to this
365
        analysis in particular. If no value set or the analysis service
366
        doesn't allow manual input of detection limits, returns the value set
367
        by default in the Analysis Service
368
        """
369
        if self.isUpperDetectionLimit():
370
            result = self.getResult()
371
            if api.is_floatable(result):
372
                return result
373
374
            logger.warn("The result for the analysis %s is an upper detection "
375
                        "limit, but not floatable: '%s'. Returning AS's "
376
                        "default UDL." % (self.id, result))
377
        return AbstractBaseAnalysis.getUpperDetectionLimit(self)
378
379
    @security.public
380
    def isBelowLowerDetectionLimit(self):
381
        """Returns True if the result is below the Lower Detection Limit or
382
        if Lower Detection Limit has been manually set
383
        """
384
        if self.isLowerDetectionLimit():
385
            return True
386
387
        result = self.getResult()
388
        if result and str(result).strip().startswith(LDL):
389
            return True
390
391
        if api.is_floatable(result):
392
            ldl = self.getLowerDetectionLimit()
393
            return api.to_float(result) < api.to_float(ldl, 0.0)
394
395
        return False
396
397
    @security.public
398
    def isAboveUpperDetectionLimit(self):
399
        """Returns True if the result is above the Upper Detection Limit or
400
        if Upper Detection Limit has been manually set
401
        """
402
        if self.isUpperDetectionLimit():
403
            return True
404
405
        result = self.getResult()
406
        if result and str(result).strip().startswith(UDL):
407
            return True
408
409
        if api.is_floatable(result):
410
            udl = self.getUpperDetectionLimit()
411
            return api.to_float(result) > api.to_float(udl, 0.0)
412
413
        return False
414
415
    @security.public
416
    def getLowerLimitOfQuantification(self):
417
        """Returns the Lower Limit of Quantification (LLOQ) for the current
418
        analysis. If the defined LLOQ is lower than the Lower Limit of
419
        Detection (LLOD), the function returns the LLOD instead. This ensures
420
        the result respects the detection threshold
421
        """
422
        llod = self.getLowerDetectionLimit()
423
        lloq = self.getField("LowerLimitOfQuantification").get(self)
424
        return llod if api.to_float(lloq) < api.to_float(llod) else lloq
425
426
    @security.public
427
    def getUpperLimitOfQuantification(self):
428
        """Returns the Upper Limit of Quantification (ULOQ) for the current
429
        analysis. If the defined ULOQ is greater than the Upper Limit of
430
        Detection (ULOD), the function returns the ULOD instead. This ensures
431
        the result respects the detection threshold
432
        """
433
        ulod = self.getUpperDetectionLimit()
434
        uloq = self.getField("UpperLimitOfQuantification").get(self)
435
        return ulod if api.to_float(uloq) > api.to_float(ulod) else uloq
436
437
    @security.public
438
    def isBelowLimitOfQuantification(self):
439
        """Returns whether the result is below the Limit of Quantification LOQ
440
        """
441
        result = self.getResult()
442
        if not api.is_floatable(result):
443
            return False
444
445
        lloq = self.getLowerLimitOfQuantification()
446
        return api.to_float(result) < api.to_float(lloq)
447
448
    @security.public
449
    def isAboveLimitOfQuantification(self):
450
        """Returns whether the result is above the Limit of Quantification LOQ
451
        """
452
        result = self.getResult()
453
        if not api.is_floatable(result):
454
            return False
455
456
        uloq = self.getUpperLimitOfQuantification()
457
        return api.to_float(result) > api.to_float(uloq)
458
459
    @security.public
460
    def isOutsideTheQuantifiableRange(self):
461
        """Returns whether the result falls outside the quantifiable range
462
        specified by the Lower Limit of Quantification (LLOQ) and Upper Limit
463
        of Quantification (ULOQ).
464
        """
465
        if self.isBelowLimitOfQuantification():
466
            return True
467
        if self.isAboveLimitOfQuantification():
468
            return True
469
        return False
470
471
    # TODO: REMOVE:  nowhere used
472
    @deprecated("This Method will be removed in version 2.5")
473
    @security.public
474
    def getDetectionLimits(self):
475
        """Returns a two-value array with the limits of detection (LDL and
476
        UDL) that applies to this analysis in particular. If no value set or
477
        the analysis service doesn't allow manual input of detection limits,
478
        returns the value set by default in the Analysis Service
479
        """
480
        ldl = self.getLowerDetectionLimit()
481
        udl = self.getUpperDetectionLimit()
482
        return [api.to_float(ldl, 0.0), api.to_float(udl, 0.0)]
483
484
    @security.public
485
    def isLowerDetectionLimit(self):
486
        """Returns True if the result for this analysis represents a Lower
487
        Detection Limit. Otherwise, returns False
488
        """
489
        return self.getDetectionLimitOperand() == LDL
490
491
    @security.public
492
    def isUpperDetectionLimit(self):
493
        """Returns True if the result for this analysis represents an Upper
494
        Detection Limit. Otherwise, returns False
495
        """
496
        return self.getDetectionLimitOperand() == UDL
497
498
    @security.public
499
    def getDependents(self):
500
        """Return a list of analyses who depend on us to calculate their result
501
        """
502
        raise NotImplementedError("getDependents is not implemented.")
503
504
    @security.public
505
    def getDependencies(self, with_retests=False):
506
        """Return a list of siblings who we depend on to calculate our result.
507
        :param with_retests: If false, siblings with retests are dismissed
508
        :type with_retests: bool
509
        :return: Analyses the current analysis depends on
510
        :rtype: list of IAnalysis
511
        """
512
        raise NotImplementedError("getDependencies is not implemented.")
513
514
    @security.public
515
    def setResult(self, value):
516
        """Validate and set a value into the Result field, taking into
517
        account the Detection Limits.
518
        :param value: is expected to be a string.
519
        """
520
        # Convert to list ff the analysis has result options set with multi
521
        if self.getResultOptions() and "multi" in self.getResultType():
522
            if not isinstance(value, (list, tuple)):
523
                value = filter(None, [value])
524
525
        # Handle list results
526
        if isinstance(value, (list, tuple)):
527
            value = json.dumps(value)
528
529
        # Ensure result integrity regards to None, empty and 0 values
530
        val = str("" if not value and value != 0 else value).strip()
531
532
        # Check if an string result is expected
533
        string_result = self.getStringResult()
534
535
        # UDL/LDL directly entered in the results field
536
        if not string_result and val[:1] in [LDL, UDL]:
537
            # Strip off the detection limit operand from the result
538
            operand = val[0]
539
            val = val.replace(operand, "", 1).strip()
540
541
            # Result becomes the detection limit
542
            selector = self.getDetectionLimitSelector()
543
            allow_manual = self.getAllowManualDetectionLimit()
544
            if any([selector, allow_manual]):
545
546
                # Set the detection limit operand
547
                self.setDetectionLimitOperand(operand)
548
549
                if not allow_manual:
550
                    # Manual introduction of DL is not permitted
551
                    if operand == LDL:
552
                        # Result is default LDL
553
                        val = self.getLowerDetectionLimit()
554
                    else:
555
                        # Result is default UDL
556
                        val = self.getUpperDetectionLimit()
557
558
        elif not self.getDetectionLimitSelector():
559
            # User cannot choose the detection limit from a selection list,
560
            # but might be allowed to manually enter the dl with the result.
561
            # If so, reset the detection limit operand, cause the previous
562
            # entered result might be an DL, but current doesn't
563
            self.setDetectionLimitOperand("")
564
565
        # Set the result field
566
        self.getField("Result").set(self, val)
567
568
    @security.public
569
    def calculateResult(self, override=False, cascade=False):
570
        """Calculates the result for the current analysis if it depends of
571
        other analysis/interim fields. Otherwise, do nothing
572
        """
573
        if self.getResult() and override is False:
574
            return False
575
576
        calc = self.getCalculation()
577
        if not calc:
578
            return False
579
580
        # get the formula from the calculation
581
        formula = calc.getMinifiedFormula()
582
583
        # Include the current context UID in the mapping, so it can be passed
584
        # as a param in built-in functions, like 'get_result(%(context_uid)s)'
585
        mapping = {"context_uid": '"{}"'.format(self.UID())}
586
587
        # Interims' priority order (from low to high):
588
        # Calculation < Analysis
589
        interims = calc.getInterimFields() + self.getInterimFields()
590
591
        # Add interims to mapping
592
        for i in interims:
593
594
            interim_keyword = i.get("keyword")
595
            if not interim_keyword:
596
                continue
597
598
            # skip unset values
599
            interim_value = i.get("value", "")
600
            if interim_value == "":
601
                continue
602
603
            # Convert to floatable if necessary
604
            if api.is_floatable(interim_value):
605
                interim_value = float(interim_value)
606
            else:
607
                # If the interim value is a string, since the formula is also a string,
608
                # it is needed to wrap the string interim values in between inverted commas.
609
                #
610
                # E.g. formula = '"ok" if %(var)s == "example_value" else "not ok"'
611
                #
612
                # if interim_value = "example_value" after
613
                # formula = eval("'%s'%%mapping" % formula, {'mapping': {'var': interim_value}})
614
                # print(formula)
615
                # > '"ok" if example_value == "example_value" else "not ok"' -> Error
616
                #
617
                # else if interim_value ='"example_value"' after
618
                # formula = eval("'%s'%%mapping" % formula, {'mapping': {'var': interim_value}})
619
                # print(formula)
620
                # > '"ok" if "example_value" == "example_value" else "not ok"' -> Correct
621
                interim_value = '"{}"'.format(interim_value)
622
623
            # Convert 'Numeric' interim values using `float`. Convert the rest using `str`
624
            converter = "s" if i.get("result_type") else "f"
625
            formula = formula.replace(
626
                "[" + interim_keyword + "]", "%(" + interim_keyword + ")" + converter
627
            )
628
629
            mapping[interim_keyword] = interim_value
630
631
        # Add dependencies results to mapping
632
        dependencies = self.getDependencies()
633
        for dependency in dependencies:
634
            result = dependency.getResult()
635
            # check if the dependency is a string result
636
            str_result = dependency.getStringResult()
637
            keyword = dependency.getKeyword()
638
639
            # Dependency without results found
640
            if not result and cascade:
641
                # Try to calculate the dependency result
642
                dependency.calculateResult(override, cascade)
643
                result = dependency.getResult()
644
645
            if result:
646
                try:
647
                    # we need to quote a string result because of the `eval` below
648
                    result = '"%s"' % result if str_result else float(str(result))
649
                    key = dependency.getKeyword()
650
                    ldl = dependency.getLowerDetectionLimit()
651
                    udl = dependency.getUpperDetectionLimit()
652
                    lloq = dependency.getLowerLimitOfQuantification()
653
                    uloq = dependency.getUpperLimitOfQuantification()
654
                    bdl = dependency.isBelowLowerDetectionLimit()
655
                    adl = dependency.isAboveUpperDetectionLimit()
656
                    bloq = dependency.isBelowLimitOfQuantification()
657
                    aloq = dependency.isAboveLimitOfQuantification()
658
                    mapping[key] = result
659
                    mapping['%s.%s' % (key, 'RESULT')] = result
660
                    mapping['%s.%s' % (key, 'LDL')] = api.to_float(ldl, 0.0)
661
                    mapping['%s.%s' % (key, 'UDL')] = api.to_float(udl, 0.0)
662
                    mapping['%s.%s' % (key, 'LOQ')] = api.to_float(lloq, 0.0)
663
                    mapping['%s.%s' % (key, 'LLOQ')] = api.to_float(lloq, 0.0)
664
                    mapping['%s.%s' % (key, 'ULOQ')] = api.to_float(uloq, 0.0)
665
                    mapping['%s.%s' % (key, 'BELOWLDL')] = int(bdl)
666
                    mapping['%s.%s' % (key, 'ABOVEUDL')] = int(adl)
667
                    mapping['%s.%s' % (key, 'BELOWLOQ')] = int(bloq)
668
                    mapping['%s.%s' % (key, 'BELOWLLOQ')] = int(bloq)
669
                    mapping['%s.%s' % (key, 'ABOVEULOQ')] = int(aloq)
670
                except (TypeError, ValueError):
671
                    return False
672
673
                # replace placeholder -> formatting string
674
                # https://docs.python.org/2.7/library/stdtypes.html?highlight=built#string-formatting-operations
675
                converter = "s" if str_result else "f"
676
                formula = formula.replace("[" + keyword + "]", "%(" + keyword + ")" + converter)
677
            else:
678
                # flush eventual previously set result
679
                if self.getResult():
680
                    self.setResult("")
681
                    return True
682
683
                return False
684
685
        # convert any remaining placeholders, e.g. from interims etc.
686
        # NOTE: we assume remaining values are all floatable!
687
        formula = formula.replace("[", "%(").replace("]", ")f")
688
689
        # Calculate
690
        try:
691
            formula = eval("'%s'%%mapping" % formula,
692
                           {"__builtins__": None,
693
                            'math': math,
694
                            'context': self},
695
                           {'mapping': mapping})
696
            result = eval(formula, calc._getGlobals())
697
        except ZeroDivisionError:
698
            self.setResult('0/0')
699
            return True
700
        except (KeyError, TypeError, ImportError) as e:
701
            msg = "Cannot eval formula ({}): {}".format(e.message, formula)
702
            logger.error(msg)
703
            self.setResult("NA")
704
            return True
705
706
        self.setResult(str(result))
707
        return True
708
709
    @security.public
710
    def getVATAmount(self):
711
        """Compute the VAT amount without member discount.
712
        :return: the result as a float
713
        """
714
        vat = self.getVAT()
715
        price = self.getPrice()
716
        return Decimal(price) * Decimal(vat) / 100
717
718
    @security.public
719
    def getTotalPrice(self):
720
        """Obtain the total price without client's member discount. The function
721
        keeps in mind the client's bulk discount.
722
        :return: the result as a float
723
        """
724
        return Decimal(self.getPrice()) + Decimal(self.getVATAmount())
725
726
    @security.public
727
    def getDuration(self):
728
        """Returns the time in minutes taken for this analysis.
729
        If the analysis is not yet 'ready to process', returns 0
730
        If the analysis is still in progress (not yet verified),
731
            duration = date_verified - date_start_process
732
        Otherwise:
733
            duration = current_datetime - date_start_process
734
        :return: time in minutes taken for this analysis
735
        :rtype: int
736
        """
737
        starttime = self.getStartProcessDate()
738
        if not starttime:
739
            # The analysis is not yet ready to be processed
740
            return 0
741
        endtime = self.getDateVerified() or DateTime()
742
743
        # Duration in minutes
744
        duration = (endtime - starttime) * 24 * 60
745
        return duration
746
747
    @security.public
748
    def getEarliness(self):
749
        """The remaining time in minutes for this analysis to be completed.
750
        Returns zero if the analysis is neither 'ready to process' nor a
751
        turnaround time is set.
752
            earliness = duration - max_turnaround_time
753
        The analysis is late if the earliness is negative
754
        :return: the remaining time in minutes before the analysis reaches TAT
755
        :rtype: int
756
        """
757
        maxtime = self.getMaxTimeAllowed()
758
        if not maxtime:
759
            # No Turnaround time is set for this analysis
760
            return 0
761
        if api.to_minutes(**maxtime) == 0:
762
            return 0
763
        return api.to_minutes(**maxtime) - self.getDuration()
764
765
    @security.public
766
    def isLateAnalysis(self):
767
        """Returns true if the analysis is late in accordance with the maximum
768
        turnaround time. If no maximum turnaround time is set for this analysis
769
        or it is not yet ready to be processed, or there is still time
770
        remaining (earliness), returns False.
771
        :return: true if the analysis is late
772
        :rtype: bool
773
        """
774
        return self.getEarliness() < 0
775
776
    @security.public
777
    def getLateness(self):
778
        """The time in minutes that exceeds the maximum turnaround set for this
779
        analysis. If the analysis has no turnaround time set or is not ready
780
        for process yet, returns 0. The analysis is not late if the lateness is
781
        negative
782
        :return: the time in minutes that exceeds the maximum turnaround time
783
        :rtype: int
784
        """
785
        return -self.getEarliness()
786
787
    @security.public
788
    def isInstrumentAllowed(self, instrument):
789
        """Checks if the specified instrument can be set for this analysis,
790
791
        :param instrument: string,Instrument
792
        :return: True if the assignment of the passed in instrument is allowed
793
        :rtype: bool
794
        """
795
        uid = api.get_uid(instrument)
796
        return uid in self.getRawAllowedInstruments()
797
798
    @security.public
799
    def isMethodAllowed(self, method):
800
        """Checks if the analysis can follow the method specified
801
802
        :param method: string,Method
803
        :return: True if the analysis can follow the method specified
804
        :rtype: bool
805
        """
806
        uid = api.get_uid(method)
807
        return uid in self.getRawAllowedMethods()
808
809
    @security.public
810
    def getAllowedMethods(self):
811
        """Returns the allowed methods for this analysis, either if the method
812
        was assigned directly (by using "Allows manual entry of results") or
813
        indirectly via Instrument ("Allows instrument entry of results") in
814
        Analysis Service Edit View.
815
        :return: A list with the methods allowed for this analysis
816
        :rtype: list of Methods
817
        """
818
        service = self.getAnalysisService()
819
        if not service:
820
            return []
821
        # get the available methods of the service
822
        return service.getMethods()
823
824
    @security.public
825
    def getRawAllowedMethods(self):
826
        """Returns the UIDs of the allowed methods for this analysis
827
        """
828
        service = self.getAnalysisService()
829
        if not service:
830
            return []
831
        return service.getRawMethods()
832
833
    @security.public
834
    def getAllowedInstruments(self):
835
        """Returns the allowed instruments from the service
836
837
        :return: A list of instruments allowed for this Analysis
838
        :rtype: list of instruments
839
        """
840
        service = self.getAnalysisService()
841
        if not service:
842
            return []
843
        return service.getInstruments()
844
845
    @security.public
846
    def getRawAllowedInstruments(self):
847
        """Returns the UIDS of the allowed instruments from the service
848
        """
849
        service = self.getAnalysisService()
850
        if not service:
851
            return []
852
        return service.getRawInstruments()
853
854
    @security.public
855
    def getFormattedResult(self, specs=None, decimalmark='.', sciformat=1,
856
                           html=True):
857
        """Formatted result:
858
        0: If the result type is StringResult, return it without being formatted
859
        1. If the result is a detection limit, returns '< LDL' or '> UDL'
860
        2. Print ResultText of matching ResultOptions
861
        3. If the result is not floatable, return it without being formatted
862
        4. If the analysis specs has hidemin or hidemax enabled and the
863
           result is out of range, render result as '<min' or '>max'
864
        5. If the result is below Lower Detection Limit, show '<LDL'
865
        6. If the result is above Upper Detecion Limit, show '>UDL'
866
        7. Otherwise, render numerical value
867
        :param specs: Optional result specifications, a dictionary as follows:
868
            {'min': <min_val>,
869
             'max': <max_val>,
870
             'error': <error>,
871
             'hidemin': <hidemin_val>,
872
             'hidemax': <hidemax_val>}
873
        :param decimalmark: The string to be used as a decimal separator.
874
            default is '.'
875
        :param sciformat: 1. The sci notation has to be formatted as aE^+b
876
                          2. The sci notation has to be formatted as a·10^b
877
                          3. As 2, but with super html entity for exp
878
                          4. The sci notation has to be formatted as a·10^b
879
                          5. As 4, but with super html entity for exp
880
                          By default 1
881
        :param html: if true, returns an string with the special characters
882
            escaped: e.g: '<' and '>' (LDL and UDL for results like < 23.4).
883
        """
884
        result = self.getResult()
885
886
        # If result options, return text of matching option
887
        choices = self.getResultOptions()
888
        if choices:
889
            # Create a dict for easy mapping of result options
890
            values_texts = dict(map(
891
                lambda c: (str(c["ResultValue"]), c["ResultText"]), choices
892
            ))
893
894
            # Result might contain a single result option
895
            match = values_texts.get(str(result))
896
            if match:
897
                return match
898
899
            # Result might be a string with multiple options e.g. "['2', '1']"
900
            try:
901
                raw_result = json.loads(result)
902
                texts = map(lambda r: values_texts.get(str(r)), raw_result)
0 ignored issues
show
introduced by
The variable values_texts does not seem to be defined in case choices on line 888 is False. Are you sure this can never be the case?
Loading history...
903
                texts = filter(None, texts)
904
                return "<br/>".join(texts)
905
            except (ValueError, TypeError):
906
                pass
907
908
        # If string result, return without any formatting
909
        if self.getStringResult():
910
            if html:
911
                result = cgi.escape(result)
912
                result = result.replace("\n", "<br/>")
913
            return result
914
915
        # If a detection limit, return '< LDL' or '> UDL'
916
        dl = self.getDetectionLimitOperand()
917
        if dl:
918
            try:
919
                res = api.float_to_string(float(result))
920
                fdm = formatDecimalMark(res, decimalmark)
921
                hdl = cgi.escape(dl) if html else dl
922
                return '%s %s' % (hdl, fdm)
923
            except (TypeError, ValueError):
924
                logger.warn(
925
                    "The result for the analysis %s is a detection limit, "
926
                    "but not floatable: %s" % (self.id, result))
927
                return formatDecimalMark(result, decimalmark=decimalmark)
928
929
        # If not floatable, return without any formatting
930
        try:
931
            result = float(result)
932
        except (TypeError, ValueError):
933
            return formatDecimalMark(result, decimalmark=decimalmark)
934
935
        # If specs are set, evaluate if out of range
936
        specs = specs if specs else self.getResultsRange()
937
        hidemin = specs.get('hidemin', '')
938
        hidemax = specs.get('hidemax', '')
939
        try:
940
            belowmin = hidemin and result < float(hidemin) or False
941
        except (TypeError, ValueError):
942
            belowmin = False
943
        try:
944
            abovemax = hidemax and result > float(hidemax) or False
945
        except (TypeError, ValueError):
946
            abovemax = False
947
948
        # If below min and hidemin enabled, return '<min'
949
        if belowmin:
950
            fdm = formatDecimalMark('< %s' % hidemin, decimalmark)
951
            return fdm.replace('< ', '&lt; ', 1) if html else fdm
952
953
        # If above max and hidemax enabled, return '>max'
954
        if abovemax:
955
            fdm = formatDecimalMark('> %s' % hidemax, decimalmark)
956
            return fdm.replace('> ', '&gt; ', 1) if html else fdm
957
958
        # Lower Limits of Detection and Quantification (LLOD and LLOQ)
959
        llod = api.to_float(self.getLowerDetectionLimit())
960
        lloq = api.to_float(self.getLowerLimitOfQuantification())
961
        if result < llod:
962
            if llod != lloq:
963
                # Display "Not detected"
964
                result = t(_("result_below_llod", default="Not detected"))
965
                return cgi.escape(result) if html else result
966
967
            # Display < LLOD
968
            ldl = api.float_to_string(llod)
969
            result = formatDecimalMark("< %s" % ldl, decimalmark)
970
            return cgi.escape(result) if html else result
971
972
        if result < lloq:
973
            lloq = api.float_to_string(lloq)
974
            lloq = formatDecimalMark(lloq, decimalmark)
975
            result = t(_("result_below_lloq", default="Detected but < ${LLOQ}",
976
                         mapping={"LLOQ": lloq}))
977
            return cgi.escape(result) if html else result
978
979
        # Upper Limit of Quantification (ULOQ)
980
        uloq = api.to_float(self.getUpperLimitOfQuantification())
981
        if result > uloq:
982
            uloq = api.float_to_string(uloq)
983
            result = formatDecimalMark('> %s' % uloq, decimalmark)
984
            return cgi.escape(result) if html else result
985
986
        # Render numerical values
987
        return format_numeric_result(self, decimalmark=decimalmark,
988
                                     sciformat=sciformat)
989
990
    @security.public
991
    def getPrecision(self):
992
        """Returns the precision for the Analysis.
993
994
        - If ManualUncertainty is set, calculates the precision of the result
995
          in accordance with the manual uncertainty set.
996
997
        - If Calculate Precision from Uncertainty is set in Analysis Service,
998
          calculates the precision in accordance with the uncertainty inferred
999
          from uncertainties ranges.
1000
1001
        - If neither Manual Uncertainty nor Calculate Precision from
1002
          Uncertainty are set, returns the precision from the Analysis Service
1003
1004
        - If you have a number with zero uncertainty: If you roll a pair of
1005
        dice and observe five spots, the number of spots is 5. This is a raw
1006
        data point, with no uncertainty whatsoever. So just write down the
1007
        number. Similarly, the number of centimeters per inch is 2.54,
1008
        by definition, with no uncertainty whatsoever. Again: just write
1009
        down the number.
1010
1011
        Further information at AbstractBaseAnalysis.getPrecision()
1012
        """
1013
        precision = self.getField("Precision").get(self)
1014
        allow_manual = self.getAllowManualUncertainty()
1015
        precision_unc = self.getPrecisionFromUncertainty()
1016
        if allow_manual or precision_unc:
1017
1018
            # if no uncertainty rely on the fixed precision
1019
            uncertainty = self.getUncertainty()
1020
            if not api.is_floatable(uncertainty):
1021
                return precision
1022
1023
            uncertainty = api.to_float(uncertainty)
1024
            if uncertainty == 0:
1025
                # calculate the precision from the result
1026
                try:
1027
                    result = str(float(self.getResult()))
1028
                    num_decimals = result[::-1].find('.')
1029
                    return num_decimals
1030
                except ValueError:
1031
                    # result not floatable, return the fixed precision
1032
                    return precision
1033
1034
            # Get the 'raw' significant digits from uncertainty
1035
            sig_digits = get_significant_digits(uncertainty)
1036
            # Round the uncertainty to its significant digit.
1037
            # Needed because the precision for the result has to be based on
1038
            # the *rounded* uncertainty. Note the following for a given
1039
            # uncertainty value:
1040
            #   >>> round(0.09404, 2)
1041
            #   0.09
1042
            #   >>> round(0.09504, 2)
1043
            #   0.1
1044
            # The precision when the uncertainty is 0.09504 is not 2, but 1
1045
            uncertainty = abs(round(uncertainty, sig_digits))
1046
            # Return the significant digit to apply
1047
            return get_significant_digits(uncertainty)
1048
1049
        return precision
1050
1051
    @security.public
1052
    def getAnalyst(self):
1053
        """Returns the stored Analyst or the user who submitted the result
1054
        """
1055
        analyst = self.getField("Analyst").get(self) or self.getAssignedAnalyst()
1056
        if not analyst:
1057
            analyst = self.getSubmittedBy()
1058
        return analyst or ""
1059
1060
    @security.public
1061
    def getAssignedAnalyst(self):
1062
        """Returns the Analyst assigned to the worksheet this
1063
        analysis is assigned to
1064
        """
1065
        worksheet = self.getWorksheet()
1066
        if not worksheet:
1067
            return ""
1068
        return worksheet.getAnalyst() or ""
1069
1070
    @security.public
1071
    def getSubmittedBy(self):
1072
        """
1073
        Returns the identifier of the user who submitted the result if the
1074
        state of the current analysis is "to_be_verified" or "verified"
1075
        :return: the user_id of the user who did the last submission of result
1076
        """
1077
        return getTransitionActor(self, 'submit')
1078
1079
    @security.public
1080
    def getDateSubmitted(self):
1081
        """Returns the time the result was submitted.
1082
        :return: a DateTime object.
1083
        :rtype: DateTime
1084
        """
1085
        return getTransitionDate(self, 'submit', return_as_datetime=True)
1086
1087
    @security.public
1088
    def getDateVerified(self):
1089
        """Returns the time the analysis was verified. If the analysis hasn't
1090
        been yet verified, returns None
1091
        :return: the time the analysis was verified or None
1092
        :rtype: DateTime
1093
        """
1094
        return getTransitionDate(self, 'verify', return_as_datetime=True)
1095
1096
    @security.public
1097
    def getStartProcessDate(self):
1098
        """Returns the date time when the analysis is ready to be processed.
1099
        It returns the datetime when the object was created, but might be
1100
        different depending on the type of analysis (e.g. "Date Received" for
1101
        routine analyses): see overriden functions.
1102
        :return: Date time when the analysis is ready to be processed.
1103
        :rtype: DateTime
1104
        """
1105
        return self.created()
1106
1107
    @security.public
1108
    def getParentURL(self):
1109
        """This method is used to populate catalog values
1110
        This function returns the analysis' parent URL
1111
        """
1112
        parent = self.aq_parent
1113
        if parent:
1114
            return parent.absolute_url_path()
1115
1116
    @security.public
1117
    def getWorksheetUID(self):
1118
        """This method is used to populate catalog values
1119
        Returns WS UID if this analysis is assigned to a worksheet, or None.
1120
        """
1121
        uids = get_backreferences(self, relationship="WorksheetAnalysis")
1122
        if not uids:
1123
            return None
1124
1125
        if len(uids) > 1:
1126
            path = api.get_path(self)
1127
            logger.error("More than one worksheet: {}".format(path))
1128
            return None
1129
1130
        return uids[0]
1131
1132
    @security.public
1133
    def getWorksheet(self):
1134
        """Returns the Worksheet to which this analysis belongs to, or None
1135
        """
1136
        worksheet_uid = self.getWorksheetUID()
1137
        return api.get_object_by_uid(worksheet_uid, None)
1138
1139
    @security.public
1140
    def remove_duplicates(self, ws):
1141
        """When this analysis is unassigned from a worksheet, this function
1142
        is responsible for deleting DuplicateAnalysis objects from the ws.
1143
        """
1144
        for analysis in ws.objectValues():
1145
            if IDuplicateAnalysis.providedBy(analysis) \
1146
                    and analysis.getAnalysis().UID() == self.UID():
1147
                ws.removeAnalysis(analysis)
1148
1149
    def setInterimValue(self, keyword, value):
1150
        """Sets a value to an interim of this analysis
1151
        :param keyword: the keyword of the interim
1152
        :param value: the value for the interim
1153
        """
1154
        # Ensure value format integrity
1155
        if value is None:
1156
            value = ""
1157
        elif isinstance(value, string_types):
1158
            value = value.strip()
1159
        elif isinstance(value, (list, tuple, set, dict)):
1160
            value = json.dumps(value)
1161
1162
        # Ensure result integrity regards to None, empty and 0 values
1163
        interims = copy.deepcopy(self.getInterimFields())
1164
        for interim in interims:
1165
            if interim.get("keyword") == keyword:
1166
                interim["value"] = str(value)
1167
        self.setInterimFields(interims)
1168
1169
    def getInterimValue(self, keyword):
1170
        """Returns the value of an interim of this analysis
1171
        """
1172
        interims = filter(lambda item: item["keyword"] == keyword,
1173
                          self.getInterimFields())
1174
        if not interims:
1175
            logger.warning("Interim '{}' for analysis '{}' not found"
1176
                           .format(keyword, self.getKeyword()))
1177
            return None
1178
        if len(interims) > 1:
1179
            logger.error("More than one interim '{}' found for '{}'"
1180
                         .format(keyword, self.getKeyword()))
1181
            return None
1182
        return interims[0].get('value', '')
1183
1184
    def isRetest(self):
1185
        """Returns whether this analysis is a retest or not
1186
        """
1187
        if self.getRawRetestOf():
1188
            return True
1189
        return False
1190
1191
    def getRetestOfUID(self):
1192
        """Returns the UID of the retracted analysis this is a retest of
1193
        """
1194
        return self.getRawRetestOf()
1195
1196
    def getRawRetest(self):
1197
        """Returns the UID of the retest that comes from this analysis, if any
1198
        """
1199
        relationship = self.getField("RetestOf").relationship
1200
        uids = get_backreferences(self, relationship)
1201
        if not uids:
1202
            return None
1203
        if len(uids) > 1:
1204
            logger.warn("Analysis {} with multiple retests".format(self.id))
1205
        return uids[0]
1206
1207
    def getRetest(self):
1208
        """Returns the retest that comes from this analysis, if any
1209
        """
1210
        retest_uid = self.getRawRetest()
1211
        return api.get_object(retest_uid, default=None)
1212
1213
    def isRetested(self):
1214
        """Returns whether this analysis has been retested or not
1215
        """
1216
        if self.getRawRetest():
1217
            return True
1218
        return False
1219