AbstractAnalysis.getLowerDetectionLimit()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 16
rs 9.95
c 0
b 0
f 0
cc 3
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import cgi
22
import copy
23
import json
24
import math
25
from decimal import Decimal
26
27
from AccessControl import ClassSecurityInfo
28
from bika.lims import api
29
from bika.lims import bikaMessageFactory as _
30
from bika.lims import deprecated
31
from bika.lims import logger
32
from bika.lims.browser.fields import HistoryAwareReferenceField
33
from bika.lims.browser.fields import InterimFieldsField
34
from bika.lims.browser.fields import ResultRangeField
35
from bika.lims.browser.fields import UIDReferenceField
36
from bika.lims.browser.fields.uidreferencefield import get_backreferences
37
from bika.lims.browser.widgets import RecordsWidget
38
from bika.lims.config import LDL
39
from bika.lims.config import UDL
40
from bika.lims.content.abstractbaseanalysis import AbstractBaseAnalysis
41
from bika.lims.content.abstractbaseanalysis import schema
42
from bika.lims.interfaces import IDuplicateAnalysis
43
from bika.lims.utils import formatDecimalMark
44
from bika.lims.utils.analysis import format_numeric_result
45
from bika.lims.utils.analysis import get_significant_digits
46
from bika.lims.workflow import getTransitionActor
47
from bika.lims.workflow import getTransitionDate
48
from DateTime import DateTime
49
from Products.Archetypes.Field import IntegerField
50
from Products.Archetypes.Field import StringField
51
from Products.Archetypes.references import HoldingReference
52
from Products.Archetypes.Schema import Schema
53
from Products.CMFCore.permissions import View
54
from senaite.core.api import dtime
55
from senaite.core.browser.fields.datetime import DateTimeField
56
from senaite.core.i18n import translate as t
57
from senaite.core.i18n import get_dt_format
58
from senaite.core.permissions import FieldEditAnalysisResult
59
from senaite.core.permissions import ViewResults
60
from six import string_types
61
62
# A link directly to the AnalysisService object used to create the analysis
63
AnalysisService = UIDReferenceField(
64
    'AnalysisService'
65
)
66
67
# Attachments which are added manually in the UI, or automatically when
68
# results are imported from a file supplied by an instrument.
69
Attachment = UIDReferenceField(
70
    'Attachment',
71
    multiValued=1,
72
    allowed_types=('Attachment',),
73
    relationship='AnalysisAttachment'
74
)
75
76
# The final result of the analysis is stored here
77
Result = StringField(
78
    'Result',
79
    read_permission=ViewResults,
80
    write_permission=FieldEditAnalysisResult,
81
)
82
83
# When the result is changed, this value is updated to the current time.
84
# Only the most recent result capture date is recorded here and used to
85
# populate catalog values, however the workflow review_history can be
86
# used to get all dates of result capture
87
ResultCaptureDate = DateTimeField(
88
    'ResultCaptureDate',
89
    read_permission=View,
90
    write_permission=FieldEditAnalysisResult,
91
    max="current",
92
)
93
94
# Returns the retracted analysis this analysis is a retest of
95
RetestOf = UIDReferenceField(
96
    'RetestOf',
97
    relationship="AnalysisRetestOf",
98
)
99
100
# If the result is outside of the detection limits of the method or instrument,
101
# the operand (< or >) is stored here.  For routine analyses this is taken
102
# from the Result, if the result entered explicitly startswith "<" or ">"
103
DetectionLimitOperand = StringField(
104
    'DetectionLimitOperand',
105
    read_permission=View,
106
    write_permission=FieldEditAnalysisResult,
107
)
108
109
# The ID of the logged in user who submitted the result for this Analysis.
110
Analyst = StringField(
111
    'Analyst'
112
)
113
114
# The actual uncertainty for this analysis' result, populated from the ranges
115
# specified in the analysis service when the result is submitted.
116
Uncertainty = StringField(
117
    "Uncertainty",
118
    read_permission=View,
119
    write_permission=FieldEditAnalysisResult,
120
    precision=10,
121
)
122
123
# transitioned to a 'verified' state. This value is set automatically
124
# when the analysis is created, based on the value set for the property
125
# NumberOfRequiredVerifications from the Analysis Service
126
NumberOfRequiredVerifications = IntegerField(
127
    'NumberOfRequiredVerifications',
128
    default=1
129
)
130
131
# Routine Analyses and Reference Analysis have a versioned link to
132
# the calculation at creation time.
133
Calculation = HistoryAwareReferenceField(
134
    'Calculation',
135
    read_permission=View,
136
    write_permission=FieldEditAnalysisResult,
137
    allowed_types=('Calculation',),
138
    relationship='AnalysisCalculation',
139
    referenceClass=HoldingReference
140
)
141
142
# InterimFields are defined in Calculations, Services, and Analyses.
143
# In Analysis Services, the default values are taken from Calculation.
144
# In Analyses, the default values are taken from the Analysis Service.
145
# When instrument results are imported, the values in analysis are overridden
146
# before the calculation is performed.
147
InterimFields = InterimFieldsField(
148
    'InterimFields',
149
    read_permission=View,
150
    write_permission=FieldEditAnalysisResult,
151
    schemata='Method',
152
    widget=RecordsWidget(
153
        label=_("Calculation Interim Fields"),
154
        description=_(
155
            "Values can be entered here which will override the defaults "
156
            "specified in the Calculation Interim Fields."),
157
    )
158
)
159
160
# Results Range that applies to this analysis
161
ResultsRange = ResultRangeField(
162
    "ResultsRange",
163
    required=0
164
)
165
166
schema = schema.copy() + Schema((
167
    AnalysisService,
168
    Analyst,
169
    Attachment,
170
    DetectionLimitOperand,
171
    # NumberOfRequiredVerifications overrides AbstractBaseClass
172
    NumberOfRequiredVerifications,
173
    Result,
174
    ResultCaptureDate,
175
    RetestOf,
176
    Uncertainty,
177
    Calculation,
178
    InterimFields,
179
    ResultsRange,
180
))
181
182
183
class AbstractAnalysis(AbstractBaseAnalysis):
184
    security = ClassSecurityInfo()
185
    displayContentsTab = False
186
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
187
188
    @deprecated('[1705] Currently returns the Analysis object itself.  If you '
189
                'need to get the service, use getAnalysisService instead')
190
    @security.public
191
    def getService(self):
192
        return self
193
194
    def getServiceUID(self):
195
        """Return the UID of the associated service.
196
        """
197
        return self.getRawAnalysisService()
198
199
    @security.public
200
    def getNumberOfVerifications(self):
201
        return len(self.getVerificators())
202
203
    @security.public
204
    def getNumberOfRemainingVerifications(self):
205
        required = self.getNumberOfRequiredVerifications()
206
        done = self.getNumberOfVerifications()
207
        if done >= required:
208
            return 0
209
        return required - done
210
211
    # TODO Workflow - analysis . Remove?
212
    @security.public
213
    def getLastVerificator(self):
214
        verifiers = self.getVerificators()
215
        return verifiers and verifiers[-1] or None
216
217
    @security.public
218
    def getVerificators(self):
219
        """Returns the user ids of the users that verified this analysis
220
        """
221
        verifiers = list()
222
        actions = ["retest", "verify", "multi_verify"]
223
        for event in api.get_review_history(self, rev=False):
224
            if event.get("review_state") == "verified":
225
                # include all transitions their end state is 'verified'
226
                verifiers.append(event["actor"])
227
            elif event.get("action") in actions:
228
                # include some transitions their end state is not 'verified'
229
                verifiers.append(event["actor"])
230
        return verifiers
231
232
    @security.public
233
    def getDefaultUncertainty(self):
234
        """Return the uncertainty value, if the result falls within
235
        specified ranges for the service from which this analysis was derived.
236
        """
237
        result = self.getResult()
238
        if not api.is_floatable(result):
239
            return None
240
241
        uncertainties = self.getUncertainties()
242
        if not uncertainties:
243
            return None
244
245
        result = api.to_float(result)
246
        for record in uncertainties:
247
248
            # convert to min/max
249
            unc_min = api.to_float(record["intercept_min"], default=0)
250
            unc_max = api.to_float(record["intercept_max"], default=0)
251
252
            if unc_min <= result <= unc_max:
253
                # result is within the range defined for this uncertainty
254
                uncertainty = str(record["errorvalue"]).strip()
255
                if uncertainty.endswith("%"):
256
                    # uncertainty expressed as a percentage of the result
257
                    try:
258
                        percentage = float(uncertainty.replace("%", ""))
259
                        uncertainty = result / 100 * percentage
260
                    except ValueError:
261
                        return None
262
                else:
263
                    uncertainty = api.to_float(uncertainty, default=0)
264
265
                # convert back to string value
266
                return api.float_to_string(uncertainty, default=None)
267
268
        return None
269
270
    @security.public
271
    def getUncertainty(self):
272
        """Returns the uncertainty for this analysis.
273
274
        Returns the value from Schema's Uncertainty field if the Service has
275
        the option 'Allow manual uncertainty'.
276
        Otherwise, do a callback to getDefaultUncertainty().
277
278
        Returns None if no result specified and the current result for this
279
        analysis is outside of the quantifiable range.
280
        """
281
        if self.isOutsideTheQuantifiableRange():
282
            # does not make sense to display uncertainty if the result is
283
            # outside of the quantifiable because the measurement is not
284
            # reliable or accurate enough to confidently quantify the analyte
285
            return None
286
287
        uncertainty = self.getField("Uncertainty").get(self)
288
        if uncertainty and self.getAllowManualUncertainty():
289
            # the uncertainty has been manually set on results introduction
290
            return api.float_to_string(uncertainty, default=None)
291
292
        # fallback to the default uncertainty for this analysis
293
        return self.getDefaultUncertainty()
294
295
    @security.public
296
    def setUncertainty(self, unc):
297
        """Sets the uncertainty for this analysis
298
299
        If the result is a Detection Limit or the value is below LDL or upper
300
        UDL, set the uncertainty to None``
301
        """
302
        if self.isOutsideTheQuantifiableRange():
303
            unc = None
304
305
        field = self.getField("Uncertainty")
306
        field.set(self, api.float_to_string(unc, default=None))
307
308
    @security.public
309
    def setDetectionLimitOperand(self, value):
310
        """Set detection limit operand for this analysis
311
        Allowed detection limit operands are `<` and `>`.
312
        """
313
        manual_dl = self.getAllowManualDetectionLimit()
314
        selector = self.getDetectionLimitSelector()
315
        if not manual_dl and not selector:
316
            # Don't allow the user to set the limit operand if manual assignment
317
            # is not allowed and selector is not visible
318
            return
319
320
        # Changing the detection limit operand has a side effect on the result
321
        result = self.getResult()
322
        if value in [LDL, UDL]:
323
            # flush uncertainty
324
            self.setUncertainty("")
325
326
            # If no previous result or user is not allowed to manually set the
327
            # the detection limit, override the result with default LDL/UDL
328
            has_result = api.is_floatable(result)
329
            if not has_result or not manual_dl:
330
                # set the result according to the system default UDL/LDL values
331
                if value == LDL:
332
                    result = self.getLowerDetectionLimit()
333
                else:
334
                    result = self.getUpperDetectionLimit()
335
336
        else:
337
            value = ""
338
339
        # Set the result
340
        self.getField("Result").set(self, result)
341
342
        # Set the detection limit to the field
343
        self.getField("DetectionLimitOperand").set(self, value)
344
345
    # Method getLowerDetectionLimit overrides method of class BaseAnalysis
346
    @security.public
347
    def getLowerDetectionLimit(self):
348
        """Returns the Lower Detection Limit (LDL) that applies to this
349
        analysis in particular. If no value set or the analysis service
350
        doesn't allow manual input of detection limits, returns the value set
351
        by default in the Analysis Service
352
        """
353
        if self.isLowerDetectionLimit():
354
            result = self.getResult()
355
            if api.is_floatable(result):
356
                return result
357
358
            logger.warn("The result for the analysis %s is a lower detection "
359
                        "limit, but not floatable: '%s'. Returning AS's "
360
                        "default LDL." % (self.id, result))
361
        return AbstractBaseAnalysis.getLowerDetectionLimit(self)
362
363
    # Method getUpperDetectionLimit overrides method of class BaseAnalysis
364
    @security.public
365
    def getUpperDetectionLimit(self):
366
        """Returns the Upper Detection Limit (UDL) that applies to this
367
        analysis in particular. If no value set or the analysis service
368
        doesn't allow manual input of detection limits, returns the value set
369
        by default in the Analysis Service
370
        """
371
        if self.isUpperDetectionLimit():
372
            result = self.getResult()
373
            if api.is_floatable(result):
374
                return result
375
376
            logger.warn("The result for the analysis %s is an upper detection "
377
                        "limit, but not floatable: '%s'. Returning AS's "
378
                        "default UDL." % (self.id, result))
379
        return AbstractBaseAnalysis.getUpperDetectionLimit(self)
380
381
    @security.public
382
    def isBelowLowerDetectionLimit(self):
383
        """Returns True if the result is below the Lower Detection Limit or
384
        if Lower Detection Limit has been manually set
385
        """
386
        if self.isLowerDetectionLimit():
387
            return True
388
389
        result = self.getResult()
390
        if result and str(result).strip().startswith(LDL):
391
            return True
392
393
        if api.is_floatable(result):
394
            ldl = self.getLowerDetectionLimit()
395
            return api.to_float(result) < api.to_float(ldl, 0.0)
396
397
        return False
398
399
    @security.public
400
    def isAboveUpperDetectionLimit(self):
401
        """Returns True if the result is above the Upper Detection Limit or
402
        if Upper Detection Limit has been manually set
403
        """
404
        if self.isUpperDetectionLimit():
405
            return True
406
407
        result = self.getResult()
408
        if result and str(result).strip().startswith(UDL):
409
            return True
410
411
        if api.is_floatable(result):
412
            udl = self.getUpperDetectionLimit()
413
            return api.to_float(result) > api.to_float(udl, 0.0)
414
415
        return False
416
417
    @security.public
418
    def getLowerLimitOfQuantification(self):
419
        """Returns the Lower Limit of Quantification (LLOQ) for the current
420
        analysis. If the defined LLOQ is lower than the Lower Limit of
421
        Detection (LLOD), the function returns the LLOD instead. This ensures
422
        the result respects the detection threshold
423
        """
424
        llod = self.getLowerDetectionLimit()
425
        lloq = self.getField("LowerLimitOfQuantification").get(self)
426
        return llod if api.to_float(lloq) < api.to_float(llod) else lloq
427
428
    @security.public
429
    def getUpperLimitOfQuantification(self):
430
        """Returns the Upper Limit of Quantification (ULOQ) for the current
431
        analysis. If the defined ULOQ is greater than the Upper Limit of
432
        Detection (ULOD), the function returns the ULOD instead. This ensures
433
        the result respects the detection threshold
434
        """
435
        ulod = self.getUpperDetectionLimit()
436
        uloq = self.getField("UpperLimitOfQuantification").get(self)
437
        return ulod if api.to_float(uloq) > api.to_float(ulod) else uloq
438
439
    @security.public
440
    def isBelowLimitOfQuantification(self):
441
        """Returns whether the result is below the Limit of Quantification LOQ
442
        """
443
        result = self.getResult()
444
        if not api.is_floatable(result):
445
            return False
446
447
        lloq = self.getLowerLimitOfQuantification()
448
        return api.to_float(result) < api.to_float(lloq)
449
450
    @security.public
451
    def isAboveLimitOfQuantification(self):
452
        """Returns whether the result is above the Limit of Quantification LOQ
453
        """
454
        result = self.getResult()
455
        if not api.is_floatable(result):
456
            return False
457
458
        uloq = self.getUpperLimitOfQuantification()
459
        return api.to_float(result) > api.to_float(uloq)
460
461
    @security.public
462
    def isOutsideTheQuantifiableRange(self):
463
        """Returns whether the result falls outside the quantifiable range
464
        specified by the Lower Limit of Quantification (LLOQ) and Upper Limit
465
        of Quantification (ULOQ).
466
        """
467
        if self.isBelowLimitOfQuantification():
468
            return True
469
        if self.isAboveLimitOfQuantification():
470
            return True
471
        return False
472
473
    # TODO: REMOVE:  nowhere used
474
    @deprecated("This Method will be removed in version 2.5")
475
    @security.public
476
    def getDetectionLimits(self):
477
        """Returns a two-value array with the limits of detection (LDL and
478
        UDL) that applies to this analysis in particular. If no value set or
479
        the analysis service doesn't allow manual input of detection limits,
480
        returns the value set by default in the Analysis Service
481
        """
482
        ldl = self.getLowerDetectionLimit()
483
        udl = self.getUpperDetectionLimit()
484
        return [api.to_float(ldl, 0.0), api.to_float(udl, 0.0)]
485
486
    @security.public
487
    def isLowerDetectionLimit(self):
488
        """Returns True if the result for this analysis represents a Lower
489
        Detection Limit. Otherwise, returns False
490
        """
491
        return self.getDetectionLimitOperand() == LDL
492
493
    @security.public
494
    def isUpperDetectionLimit(self):
495
        """Returns True if the result for this analysis represents an Upper
496
        Detection Limit. Otherwise, returns False
497
        """
498
        return self.getDetectionLimitOperand() == UDL
499
500
    @security.public
501
    def getDependents(self):
502
        """Return a list of analyses who depend on us to calculate their result
503
        """
504
        raise NotImplementedError("getDependents is not implemented.")
505
506
    @security.public
507
    def getDependencies(self, with_retests=False):
508
        """Return a list of siblings who we depend on to calculate our result.
509
        :param with_retests: If false, siblings with retests are dismissed
510
        :type with_retests: bool
511
        :return: Analyses the current analysis depends on
512
        :rtype: list of IAnalysis
513
        """
514
        raise NotImplementedError("getDependencies is not implemented.")
515
516
    @security.public
517
    def setResult(self, value):
518
        """Validate and set a value into the Result field, taking into
519
        account the Detection Limits.
520
        :param value: is expected to be a string.
521
        """
522
        # Convert to list ff the analysis has result options set with multi
523
        if self.getResultOptions() and "multi" in self.getResultType():
524
            if not isinstance(value, (list, tuple)):
525
                value = filter(None, [value])
526
527
        # Handle list results
528
        if isinstance(value, (list, tuple)):
529
            value = json.dumps(value)
530
531
        # Ensure result integrity regards to None, empty and 0 values
532
        val = str("" if not value and value != 0 else value).strip()
533
534
        # Check if a date/time result
535
        result_type = self.getResultType()
536
        if result_type in ["date", "datetime"]:
537
            # convert to datetime
538
            dt = dtime.to_dt(val)
539
            # make it TZ-naive to prevent undesired shifts
540
            dt = dt.replace(tzinfo=None) if dt else None
541
            # store as ISO format for easy handling
542
            with_time = result_type == "datetime"
543
            fmt = "%Y-%m-%d %H:%M:%S" if with_time else "%Y-%m-%d"
544
            val = dtime.date_to_string(dt, fmt=fmt)
545
            self.getField("Result").set(self, val)
546
            return
547
548
        # Check if an string result is expected
549
        string_result = self.getStringResult()
550
551
        # UDL/LDL directly entered in the results field
552
        if not string_result and val[:1] in [LDL, UDL]:
553
            # Strip off the detection limit operand from the result
554
            operand = val[0]
555
            val = val.replace(operand, "", 1).strip()
556
557
            # Result becomes the detection limit
558
            selector = self.getDetectionLimitSelector()
559
            allow_manual = self.getAllowManualDetectionLimit()
560
            if any([selector, allow_manual]):
561
562
                # Set the detection limit operand
563
                self.setDetectionLimitOperand(operand)
564
565
                if not allow_manual:
566
                    # Manual introduction of DL is not permitted
567
                    if operand == LDL:
568
                        # Result is default LDL
569
                        val = self.getLowerDetectionLimit()
570
                    else:
571
                        # Result is default UDL
572
                        val = self.getUpperDetectionLimit()
573
574
        elif not self.getDetectionLimitSelector():
575
            # User cannot choose the detection limit from a selection list,
576
            # but might be allowed to manually enter the dl with the result.
577
            # If so, reset the detection limit operand, cause the previous
578
            # entered result might be an DL, but current doesn't
579
            self.setDetectionLimitOperand("")
580
581
        # Set the result field
582
        self.getField("Result").set(self, val)
583
584
    @security.public
585
    def calculateResult(self, override=False, cascade=False):
586
        """Calculates the result for the current analysis if it depends of
587
        other analysis/interim fields. Otherwise, do nothing
588
        """
589
        if self.getResult() and override is False:
590
            return False
591
592
        calc = self.getCalculation()
593
        if not calc:
594
            return False
595
596
        # get the formula from the calculation
597
        formula = calc.getMinifiedFormula()
598
599
        # Include the current context UID in the mapping, so it can be passed
600
        # as a param in built-in functions, like 'get_result(%(context_uid)s)'
601
        mapping = {"context_uid": '"{}"'.format(self.UID())}
602
603
        # Interims' priority order (from low to high):
604
        # Calculation < Analysis
605
        interims = calc.getInterimFields() + self.getInterimFields()
606
607
        # Add interims to mapping
608
        for i in interims:
609
610
            interim_keyword = i.get("keyword")
611
            if not interim_keyword:
612
                continue
613
614
            # skip unset values
615
            interim_value = i.get("value", "")
616
            if interim_value == "":
617
                continue
618
619
            # Convert to floatable if necessary
620
            if api.is_floatable(interim_value):
621
                interim_value = float(interim_value)
622
            else:
623
                # If the interim value is a string, since the formula is also a string,
624
                # it is needed to wrap the string interim values in between inverted commas.
625
                #
626
                # E.g. formula = '"ok" if %(var)s == "example_value" else "not ok"'
627
                #
628
                # if interim_value = "example_value" after
629
                # formula = eval("'%s'%%mapping" % formula, {'mapping': {'var': interim_value}})
630
                # print(formula)
631
                # > '"ok" if example_value == "example_value" else "not ok"' -> Error
632
                #
633
                # else if interim_value ='"example_value"' after
634
                # formula = eval("'%s'%%mapping" % formula, {'mapping': {'var': interim_value}})
635
                # print(formula)
636
                # > '"ok" if "example_value" == "example_value" else "not ok"' -> Correct
637
                interim_value = '"{}"'.format(interim_value)
638
639
            # Convert 'Numeric' interim values using `float`. Convert the rest using `str`
640
            converter = "s" if i.get("result_type") else "f"
641
            formula = formula.replace(
642
                "[" + interim_keyword + "]", "%(" + interim_keyword + ")" + converter
643
            )
644
645
            mapping[interim_keyword] = interim_value
646
647
        # Add dependencies results to mapping
648
        dependencies = self.getDependencies()
649
        for dependency in dependencies:
650
            result = dependency.getResult()
651
            # check if the dependency is a string result
652
            str_result = dependency.getStringResult()
653
            keyword = dependency.getKeyword()
654
655
            # Dependency without results found
656
            if not result and cascade:
657
                # Try to calculate the dependency result
658
                dependency.calculateResult(override, cascade)
659
                result = dependency.getResult()
660
661
            if result:
662
                try:
663
                    # we need to quote a string result because of the `eval` below
664
                    result = '"%s"' % result if str_result else float(str(result))
665
                    key = dependency.getKeyword()
666
                    ldl = dependency.getLowerDetectionLimit()
667
                    udl = dependency.getUpperDetectionLimit()
668
                    lloq = dependency.getLowerLimitOfQuantification()
669
                    uloq = dependency.getUpperLimitOfQuantification()
670
                    bdl = dependency.isBelowLowerDetectionLimit()
671
                    adl = dependency.isAboveUpperDetectionLimit()
672
                    bloq = dependency.isBelowLimitOfQuantification()
673
                    aloq = dependency.isAboveLimitOfQuantification()
674
                    mapping[key] = result
675
                    mapping['%s.%s' % (key, 'RESULT')] = result
676
                    mapping['%s.%s' % (key, 'LDL')] = api.to_float(ldl, 0.0)
677
                    mapping['%s.%s' % (key, 'UDL')] = api.to_float(udl, 0.0)
678
                    mapping['%s.%s' % (key, 'LOQ')] = api.to_float(lloq, 0.0)
679
                    mapping['%s.%s' % (key, 'LLOQ')] = api.to_float(lloq, 0.0)
680
                    mapping['%s.%s' % (key, 'ULOQ')] = api.to_float(uloq, 0.0)
681
                    mapping['%s.%s' % (key, 'BELOWLDL')] = int(bdl)
682
                    mapping['%s.%s' % (key, 'ABOVEUDL')] = int(adl)
683
                    mapping['%s.%s' % (key, 'BELOWLOQ')] = int(bloq)
684
                    mapping['%s.%s' % (key, 'BELOWLLOQ')] = int(bloq)
685
                    mapping['%s.%s' % (key, 'ABOVEULOQ')] = int(aloq)
686
                except (TypeError, ValueError):
687
                    return False
688
689
                # replace placeholder -> formatting string
690
                # https://docs.python.org/2.7/library/stdtypes.html?highlight=built#string-formatting-operations
691
                converter = "s" if str_result else "f"
692
                formula = formula.replace("[" + keyword + "]", "%(" + keyword + ")" + converter)
693
            else:
694
                # flush eventual previously set result
695
                if self.getResult():
696
                    self.setResult("")
697
                    return True
698
699
                return False
700
701
        # convert any remaining placeholders, e.g. from interims etc.
702
        # NOTE: we assume remaining values are all floatable!
703
        formula = formula.replace("[", "%(").replace("]", ")f")
704
705
        # Calculate
706
        try:
707
            formula = eval("'%s'%%mapping" % formula,
708
                           {"__builtins__": None,
709
                            'math': math,
710
                            'context': self},
711
                           {'mapping': mapping})
712
            result = eval(formula, calc._getGlobals())
713
        except ZeroDivisionError:
714
            self.setResult('0/0')
715
            return True
716
        except (KeyError, TypeError, ImportError) as e:
717
            msg = "Cannot eval formula ({}): {}".format(e.message, formula)
718
            logger.error(msg)
719
            self.setResult("NA")
720
            return True
721
722
        self.setResult(str(result))
723
        return True
724
725
    @security.public
726
    def getVATAmount(self):
727
        """Compute the VAT amount without member discount.
728
        :return: the result as a float
729
        """
730
        vat = self.getVAT()
731
        price = self.getPrice()
732
        return Decimal(price) * Decimal(vat) / 100
733
734
    @security.public
735
    def getTotalPrice(self):
736
        """Obtain the total price without client's member discount. The function
737
        keeps in mind the client's bulk discount.
738
        :return: the result as a float
739
        """
740
        return Decimal(self.getPrice()) + Decimal(self.getVATAmount())
741
742
    @security.public
743
    def getDuration(self):
744
        """Returns the time in minutes taken for this analysis.
745
        If the analysis is not yet 'ready to process', returns 0
746
        If the analysis is still in progress (not yet verified),
747
            duration = date_verified - date_start_process
748
        Otherwise:
749
            duration = current_datetime - date_start_process
750
        :return: time in minutes taken for this analysis
751
        :rtype: int
752
        """
753
        starttime = self.getStartProcessDate()
754
        if not starttime:
755
            # The analysis is not yet ready to be processed
756
            return 0
757
        endtime = self.getDateVerified() or DateTime()
758
759
        # Duration in minutes
760
        duration = (endtime - starttime) * 24 * 60
761
        return duration
762
763
    @security.public
764
    def getEarliness(self):
765
        """The remaining time in minutes for this analysis to be completed.
766
        Returns zero if the analysis is neither 'ready to process' nor a
767
        turnaround time is set.
768
            earliness = duration - max_turnaround_time
769
        The analysis is late if the earliness is negative
770
        :return: the remaining time in minutes before the analysis reaches TAT
771
        :rtype: int
772
        """
773
        maxtime = self.getMaxTimeAllowed()
774
        if not maxtime:
775
            # No Turnaround time is set for this analysis
776
            return 0
777
        if api.to_minutes(**maxtime) == 0:
778
            return 0
779
        return api.to_minutes(**maxtime) - self.getDuration()
780
781
    @security.public
782
    def isLateAnalysis(self):
783
        """Returns true if the analysis is late in accordance with the maximum
784
        turnaround time. If no maximum turnaround time is set for this analysis
785
        or it is not yet ready to be processed, or there is still time
786
        remaining (earliness), returns False.
787
        :return: true if the analysis is late
788
        :rtype: bool
789
        """
790
        return self.getEarliness() < 0
791
792
    @security.public
793
    def getLateness(self):
794
        """The time in minutes that exceeds the maximum turnaround set for this
795
        analysis. If the analysis has no turnaround time set or is not ready
796
        for process yet, returns 0. The analysis is not late if the lateness is
797
        negative
798
        :return: the time in minutes that exceeds the maximum turnaround time
799
        :rtype: int
800
        """
801
        return -self.getEarliness()
802
803
    @security.public
804
    def isInstrumentAllowed(self, instrument):
805
        """Checks if the specified instrument can be set for this analysis,
806
807
        :param instrument: string,Instrument
808
        :return: True if the assignment of the passed in instrument is allowed
809
        :rtype: bool
810
        """
811
        uid = api.get_uid(instrument)
812
        return uid in self.getRawAllowedInstruments()
813
814
    @security.public
815
    def isMethodAllowed(self, method):
816
        """Checks if the analysis can follow the method specified
817
818
        :param method: string,Method
819
        :return: True if the analysis can follow the method specified
820
        :rtype: bool
821
        """
822
        uid = api.get_uid(method)
823
        return uid in self.getRawAllowedMethods()
824
825
    @security.public
826
    def getAllowedMethods(self):
827
        """Returns the allowed methods for this analysis, either if the method
828
        was assigned directly (by using "Allows manual entry of results") or
829
        indirectly via Instrument ("Allows instrument entry of results") in
830
        Analysis Service Edit View.
831
        :return: A list with the methods allowed for this analysis
832
        :rtype: list of Methods
833
        """
834
        service = self.getAnalysisService()
835
        if not service:
836
            return []
837
        # get the available methods of the service
838
        return service.getMethods()
839
840
    @security.public
841
    def getRawAllowedMethods(self):
842
        """Returns the UIDs of the allowed methods for this analysis
843
        """
844
        service = self.getAnalysisService()
845
        if not service:
846
            return []
847
        return service.getRawMethods()
848
849
    @security.public
850
    def getAllowedInstruments(self):
851
        """Returns the allowed instruments from the service
852
853
        :return: A list of instruments allowed for this Analysis
854
        :rtype: list of instruments
855
        """
856
        service = self.getAnalysisService()
857
        if not service:
858
            return []
859
        return service.getInstruments()
860
861
    @security.public
862
    def getRawAllowedInstruments(self):
863
        """Returns the UIDS of the allowed instruments from the service
864
        """
865
        service = self.getAnalysisService()
866
        if not service:
867
            return []
868
        return service.getRawInstruments()
869
870
    @security.public
871
    def getFormattedResult(self, specs=None, decimalmark='.', sciformat=1,
872
                           html=True):
873
        """Formatted result:
874
        0: If the result type is StringResult, return it without being formatted
875
        1. If the result is a detection limit, returns '< LDL' or '> UDL'
876
        2. Print ResultText of matching ResultOptions
877
        3. If the result is not floatable, return it without being formatted
878
        4. If the analysis specs has hidemin or hidemax enabled and the
879
           result is out of range, render result as '<min' or '>max'
880
        5. If the result is below Lower Detection Limit, show '<LDL'
881
        6. If the result is above Upper Detecion Limit, show '>UDL'
882
        7. Otherwise, render numerical value
883
        :param specs: Optional result specifications, a dictionary as follows:
884
            {'min': <min_val>,
885
             'max': <max_val>,
886
             'error': <error>,
887
             'hidemin': <hidemin_val>,
888
             'hidemax': <hidemax_val>}
889
        :param decimalmark: The string to be used as a decimal separator.
890
            default is '.'
891
        :param sciformat: 1. The sci notation has to be formatted as aE^+b
892
                          2. The sci notation has to be formatted as a·10^b
893
                          3. As 2, but with super html entity for exp
894
                          4. The sci notation has to be formatted as a·10^b
895
                          5. As 4, but with super html entity for exp
896
                          By default 1
897
        :param html: if true, returns an string with the special characters
898
            escaped: e.g: '<' and '>' (LDL and UDL for results like < 23.4).
899
        """
900
        result = self.getResult()
901
902
        # If result options, return text of matching option
903
        choices = self.getResultOptions()
904
        if choices:
905
            # Create a dict for easy mapping of result options
906
            values_texts = dict(map(
907
                lambda c: (str(c["ResultValue"]), c["ResultText"]), choices
908
            ))
909
910
            # Result might contain a single result option
911
            match = values_texts.get(str(result))
912
            if match:
913
                return match
914
915
            # Result might be a string with multiple options e.g. "['2', '1']"
916
            try:
917
                raw_result = json.loads(result)
918
                texts = map(lambda r: values_texts.get(str(r)), raw_result)
0 ignored issues
show
introduced by
The variable values_texts does not seem to be defined in case choices on line 904 is False. Are you sure this can never be the case?
Loading history...
919
                texts = filter(None, texts)
920
                return "<br/>".join(texts)
921
            except (ValueError, TypeError):
922
                pass
923
924
        result_type = self.getResultType()
925
926
        # If date result, apply the format for current locale, without TZ
927
        if result_type in ["date", "datetime"]:
928
            # convert to datetime
929
            dt = dtime.to_dt(result)
930
            # make TZ-naive to prevent undesired shifts
931
            dt = dt.replace(tzinfo=None) if dt else None
932
            # apply format set for current locale, but without localizing
933
            fmt = get_dt_format(result_type)
934
            result = dtime.date_to_string(dt, fmt)
935
            return cgi.escape(result) if html else result
936
937
        # If string-like result, return without any formatting
938
        if result_type in ["string", "text"]:
939
            if html:
940
                result = result if api.is_string(result) else str(result)
941
                result = cgi.escape(result)
942
                result = result.replace("\n", "<br/>")
943
            return result
944
945
        # If a detection limit, return '< LDL' or '> UDL'
946
        dl = self.getDetectionLimitOperand()
947
        if dl:
948
            try:
949
                res = api.float_to_string(float(result))
950
                fdm = formatDecimalMark(res, decimalmark)
951
                hdl = cgi.escape(dl) if html else dl
952
                return '%s %s' % (hdl, fdm)
953
            except (TypeError, ValueError):
954
                logger.warn(
955
                    "The result for the analysis %s is a detection limit, "
956
                    "but not floatable: %s" % (self.id, result))
957
                return formatDecimalMark(result, decimalmark=decimalmark)
958
959
        # If not floatable, return without any formatting
960
        try:
961
            result = float(result)
962
        except (TypeError, ValueError):
963
            return formatDecimalMark(result, decimalmark=decimalmark)
964
965
        # If specs are set, evaluate if out of range
966
        specs = specs if specs else self.getResultsRange()
967
        hidemin = specs.get('hidemin', '')
968
        hidemax = specs.get('hidemax', '')
969
        try:
970
            belowmin = hidemin and result < float(hidemin) or False
971
        except (TypeError, ValueError):
972
            belowmin = False
973
        try:
974
            abovemax = hidemax and result > float(hidemax) or False
975
        except (TypeError, ValueError):
976
            abovemax = False
977
978
        # If below min and hidemin enabled, return '<min'
979
        if belowmin:
980
            fdm = formatDecimalMark('< %s' % hidemin, decimalmark)
981
            return fdm.replace('< ', '&lt; ', 1) if html else fdm
982
983
        # If above max and hidemax enabled, return '>max'
984
        if abovemax:
985
            fdm = formatDecimalMark('> %s' % hidemax, decimalmark)
986
            return fdm.replace('> ', '&gt; ', 1) if html else fdm
987
988
        # Lower Limits of Detection and Quantification (LLOD and LLOQ)
989
        llod = api.to_float(self.getLowerDetectionLimit())
990
        lloq = api.to_float(self.getLowerLimitOfQuantification())
991
        if result < llod:
992
            if llod != lloq:
993
                # Display "Not detected"
994
                result = t(_("result_below_llod", default="Not detected"))
995
                return cgi.escape(result) if html else result
996
997
            # Display < LLOD
998
            ldl = api.float_to_string(llod)
999
            result = formatDecimalMark("< %s" % ldl, decimalmark)
1000
            return cgi.escape(result) if html else result
1001
1002
        if result < lloq:
1003
            lloq = api.float_to_string(lloq)
1004
            lloq = formatDecimalMark(lloq, decimalmark)
1005
            result = t(_("result_below_lloq", default="Detected but < ${LLOQ}",
1006
                         mapping={"LLOQ": lloq}))
1007
            return cgi.escape(result) if html else result
1008
1009
        # Upper Limit of Quantification (ULOQ)
1010
        uloq = api.to_float(self.getUpperLimitOfQuantification())
1011
        if result > uloq:
1012
            uloq = api.float_to_string(uloq)
1013
            result = formatDecimalMark('> %s' % uloq, decimalmark)
1014
            return cgi.escape(result) if html else result
1015
1016
        # Render numerical values
1017
        return format_numeric_result(self, decimalmark=decimalmark,
1018
                                     sciformat=sciformat)
1019
1020
    @security.public
1021
    def getPrecision(self):
1022
        """Returns the precision for the Analysis.
1023
1024
        - If ManualUncertainty is set, calculates the precision of the result
1025
          in accordance with the manual uncertainty set.
1026
1027
        - If Calculate Precision from Uncertainty is set in Analysis Service,
1028
          calculates the precision in accordance with the uncertainty inferred
1029
          from uncertainties ranges.
1030
1031
        - If neither Manual Uncertainty nor Calculate Precision from
1032
          Uncertainty are set, returns the precision from the Analysis Service
1033
1034
        - If you have a number with zero uncertainty: If you roll a pair of
1035
        dice and observe five spots, the number of spots is 5. This is a raw
1036
        data point, with no uncertainty whatsoever. So just write down the
1037
        number. Similarly, the number of centimeters per inch is 2.54,
1038
        by definition, with no uncertainty whatsoever. Again: just write
1039
        down the number.
1040
1041
        Further information at AbstractBaseAnalysis.getPrecision()
1042
        """
1043
        precision = self.getField("Precision").get(self)
1044
        allow_manual = self.getAllowManualUncertainty()
1045
        precision_unc = self.getPrecisionFromUncertainty()
1046
        if allow_manual or precision_unc:
1047
1048
            # if no uncertainty rely on the fixed precision
1049
            uncertainty = self.getUncertainty()
1050
            if not api.is_floatable(uncertainty):
1051
                return precision
1052
1053
            uncertainty = api.to_float(uncertainty)
1054
            if uncertainty == 0:
1055
                # calculate the precision from the result
1056
                try:
1057
                    result = str(float(self.getResult()))
1058
                    num_decimals = result[::-1].find('.')
1059
                    return num_decimals
1060
                except ValueError:
1061
                    # result not floatable, return the fixed precision
1062
                    return precision
1063
1064
            # Get the 'raw' significant digits from uncertainty
1065
            sig_digits = get_significant_digits(uncertainty)
1066
            # Round the uncertainty to its significant digit.
1067
            # Needed because the precision for the result has to be based on
1068
            # the *rounded* uncertainty. Note the following for a given
1069
            # uncertainty value:
1070
            #   >>> round(0.09404, 2)
1071
            #   0.09
1072
            #   >>> round(0.09504, 2)
1073
            #   0.1
1074
            # The precision when the uncertainty is 0.09504 is not 2, but 1
1075
            uncertainty = abs(round(uncertainty, sig_digits))
1076
            # Return the significant digit to apply
1077
            return get_significant_digits(uncertainty)
1078
1079
        return precision
1080
1081
    @security.public
1082
    def getAnalyst(self):
1083
        """Returns the stored Analyst or the user who submitted the result
1084
        """
1085
        analyst = self.getField("Analyst").get(self) or self.getAssignedAnalyst()
1086
        if not analyst:
1087
            analyst = self.getSubmittedBy()
1088
        return analyst or ""
1089
1090
    @security.public
1091
    def getAssignedAnalyst(self):
1092
        """Returns the Analyst assigned to the worksheet this
1093
        analysis is assigned to
1094
        """
1095
        worksheet = self.getWorksheet()
1096
        if not worksheet:
1097
            return ""
1098
        return worksheet.getAnalyst() or ""
1099
1100
    @security.public
1101
    def getSubmittedBy(self):
1102
        """
1103
        Returns the identifier of the user who submitted the result if the
1104
        state of the current analysis is "to_be_verified" or "verified"
1105
        :return: the user_id of the user who did the last submission of result
1106
        """
1107
        return getTransitionActor(self, 'submit')
1108
1109
    @security.public
1110
    def getDateSubmitted(self):
1111
        """Returns the time the result was submitted.
1112
        :return: a DateTime object.
1113
        :rtype: DateTime
1114
        """
1115
        return getTransitionDate(self, 'submit', return_as_datetime=True)
1116
1117
    @security.public
1118
    def getDateVerified(self):
1119
        """Returns the time the analysis was verified. If the analysis hasn't
1120
        been yet verified, returns None
1121
        :return: the time the analysis was verified or None
1122
        :rtype: DateTime
1123
        """
1124
        return getTransitionDate(self, 'verify', return_as_datetime=True)
1125
1126
    @security.public
1127
    def getStartProcessDate(self):
1128
        """Returns the date time when the analysis is ready to be processed.
1129
        It returns the datetime when the object was created, but might be
1130
        different depending on the type of analysis (e.g. "Date Received" for
1131
        routine analyses): see overriden functions.
1132
        :return: Date time when the analysis is ready to be processed.
1133
        :rtype: DateTime
1134
        """
1135
        return self.created()
1136
1137
    @security.public
1138
    def getParentURL(self):
1139
        """This method is used to populate catalog values
1140
        This function returns the analysis' parent URL
1141
        """
1142
        parent = self.aq_parent
1143
        if parent:
1144
            return parent.absolute_url_path()
1145
1146
    @security.public
1147
    def getWorksheetUID(self):
1148
        """This method is used to populate catalog values
1149
        Returns WS UID if this analysis is assigned to a worksheet, or None.
1150
        """
1151
        uids = get_backreferences(self, relationship="WorksheetAnalysis")
1152
        if not uids:
1153
            return None
1154
1155
        if len(uids) > 1:
1156
            path = api.get_path(self)
1157
            logger.error("More than one worksheet: {}".format(path))
1158
            return None
1159
1160
        return uids[0]
1161
1162
    @security.public
1163
    def getWorksheet(self):
1164
        """Returns the Worksheet to which this analysis belongs to, or None
1165
        """
1166
        worksheet_uid = self.getWorksheetUID()
1167
        return api.get_object_by_uid(worksheet_uid, None)
1168
1169
    @security.public
1170
    def remove_duplicates(self, ws):
1171
        """When this analysis is unassigned from a worksheet, this function
1172
        is responsible for deleting DuplicateAnalysis objects from the ws.
1173
        """
1174
        for analysis in ws.objectValues():
1175
            if IDuplicateAnalysis.providedBy(analysis) \
1176
                    and analysis.getAnalysis().UID() == self.UID():
1177
                ws.removeAnalysis(analysis)
1178
1179
    def setInterimValue(self, keyword, value):
1180
        """Sets a value to an interim of this analysis
1181
        :param keyword: the keyword of the interim
1182
        :param value: the value for the interim
1183
        """
1184
        # Ensure value format integrity
1185
        if value is None:
1186
            value = ""
1187
        elif isinstance(value, string_types):
1188
            value = value.strip()
1189
        elif isinstance(value, (list, tuple, set, dict)):
1190
            value = json.dumps(value)
1191
1192
        # Ensure result integrity regards to None, empty and 0 values
1193
        interims = copy.deepcopy(self.getInterimFields())
1194
        for interim in interims:
1195
            if interim.get("keyword") == keyword:
1196
                interim["value"] = str(value)
1197
        self.setInterimFields(interims)
1198
1199
    def getInterimValue(self, keyword):
1200
        """Returns the value of an interim of this analysis
1201
        """
1202
        interims = filter(lambda item: item["keyword"] == keyword,
1203
                          self.getInterimFields())
1204
        if not interims:
1205
            logger.warning("Interim '{}' for analysis '{}' not found"
1206
                           .format(keyword, self.getKeyword()))
1207
            return None
1208
        if len(interims) > 1:
1209
            logger.error("More than one interim '{}' found for '{}'"
1210
                         .format(keyword, self.getKeyword()))
1211
            return None
1212
        return interims[0].get('value', '')
1213
1214
    def isRetest(self):
1215
        """Returns whether this analysis is a retest or not
1216
        """
1217
        if self.getRawRetestOf():
1218
            return True
1219
        return False
1220
1221
    def getRetestOfUID(self):
1222
        """Returns the UID of the retracted analysis this is a retest of
1223
        """
1224
        return self.getRawRetestOf()
1225
1226
    def getRawRetest(self):
1227
        """Returns the UID of the retest that comes from this analysis, if any
1228
        """
1229
        relationship = self.getField("RetestOf").relationship
1230
        uids = get_backreferences(self, relationship)
1231
        if not uids:
1232
            return None
1233
        if len(uids) > 1:
1234
            logger.warn("Analysis {} with multiple retests".format(self.id))
1235
        return uids[0]
1236
1237
    def getRetest(self):
1238
        """Returns the retest that comes from this analysis, if any
1239
        """
1240
        retest_uid = self.getRawRetest()
1241
        return api.get_object(retest_uid, default=None)
1242
1243
    def isRetested(self):
1244
        """Returns whether this analysis has been retested or not
1245
        """
1246
        if self.getRawRetest():
1247
            return True
1248
        return False
1249