Passed
Push — 2.x ( c6b8aa...18af2e )
by Ramon
05:36
created

AbstractAnalysis.getAnalystName()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 9
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import cgi
22
import copy
23
import json
24
import math
25
from decimal import Decimal
26
from six import string_types
27
28
from AccessControl import ClassSecurityInfo
29
from bika.lims import api
30
from bika.lims import bikaMessageFactory as _
31
from bika.lims import deprecated
32
from bika.lims import logger
33
from bika.lims import workflow as wf
34
from bika.lims.browser.fields import HistoryAwareReferenceField
35
from bika.lims.browser.fields import InterimFieldsField
36
from bika.lims.browser.fields import ResultRangeField
37
from bika.lims.browser.fields import UIDReferenceField
38
from bika.lims.browser.fields.uidreferencefield import get_backreferences
39
from bika.lims.browser.widgets import RecordsWidget
40
from bika.lims.config import LDL
41
from bika.lims.config import UDL
42
from bika.lims.content.abstractbaseanalysis import AbstractBaseAnalysis
43
from bika.lims.content.abstractbaseanalysis import schema
44
from bika.lims.interfaces import IDuplicateAnalysis
45
from bika.lims.permissions import FieldEditAnalysisResult
46
from bika.lims.utils import drop_trailing_zeros_decimal
47
from bika.lims.utils import formatDecimalMark
48
from bika.lims.utils.analysis import format_numeric_result
49
from bika.lims.utils.analysis import get_significant_digits
50
from bika.lims.workflow import getTransitionActor
51
from bika.lims.workflow import getTransitionDate
52
from DateTime import DateTime
53
from Products.Archetypes.Field import DateTimeField
54
from Products.Archetypes.Field import FixedPointField
55
from Products.Archetypes.Field import IntegerField
56
from Products.Archetypes.Field import StringField
57
from Products.Archetypes.references import HoldingReference
58
from Products.Archetypes.Schema import Schema
59
from Products.CMFCore.permissions import View
60
61
# A link directly to the AnalysisService object used to create the analysis
62
AnalysisService = UIDReferenceField(
63
    'AnalysisService'
64
)
65
66
# Attachments which are added manually in the UI, or automatically when
67
# results are imported from a file supplied by an instrument.
68
Attachment = UIDReferenceField(
69
    'Attachment',
70
    multiValued=1,
71
    allowed_types=('Attachment',)
72
)
73
74
# The final result of the analysis is stored here.  The field contains a
75
# String value, but the result itself is required to be numeric.  If
76
# a non-numeric result is needed, ResultOptions can be used.
77
Result = StringField(
78
    'Result',
79
    read_permission=View,
80
    write_permission=FieldEditAnalysisResult,
81
)
82
83
# When the result is changed, this value is updated to the current time.
84
# Only the most recent result capture date is recorded here and used to
85
# populate catalog values, however the workflow review_history can be
86
# used to get all dates of result capture
87
ResultCaptureDate = DateTimeField(
88
    'ResultCaptureDate'
89
)
90
91
# Returns the retracted analysis this analysis is a retest of
92
RetestOf = UIDReferenceField(
93
    'RetestOf'
94
)
95
96
# If the result is outside of the detection limits of the method or instrument,
97
# the operand (< or >) is stored here.  For routine analyses this is taken
98
# from the Result, if the result entered explicitly startswith "<" or ">"
99
DetectionLimitOperand = StringField(
100
    'DetectionLimitOperand',
101
    read_permission=View,
102
    write_permission=FieldEditAnalysisResult,
103
)
104
105
# The ID of the logged in user who submitted the result for this Analysis.
106
Analyst = StringField(
107
    'Analyst'
108
)
109
110
# The actual uncertainty for this analysis' result, populated from the ranges
111
# specified in the analysis service when the result is submitted.
112
Uncertainty = FixedPointField(
113
    'Uncertainty',
114
    read_permission=View,
115
    write_permission="Field: Edit Result",
116
    precision=10,
117
)
118
119
# transitioned to a 'verified' state. This value is set automatically
120
# when the analysis is created, based on the value set for the property
121
# NumberOfRequiredVerifications from the Analysis Service
122
NumberOfRequiredVerifications = IntegerField(
123
    'NumberOfRequiredVerifications',
124
    default=1
125
)
126
127
# Routine Analyses and Reference Analysis have a versioned link to
128
# the calculation at creation time.
129
Calculation = HistoryAwareReferenceField(
130
    'Calculation',
131
    read_permission=View,
132
    write_permission=FieldEditAnalysisResult,
133
    allowed_types=('Calculation',),
134
    relationship='AnalysisCalculation',
135
    referenceClass=HoldingReference
136
)
137
138
# InterimFields are defined in Calculations, Services, and Analyses.
139
# In Analysis Services, the default values are taken from Calculation.
140
# In Analyses, the default values are taken from the Analysis Service.
141
# When instrument results are imported, the values in analysis are overridden
142
# before the calculation is performed.
143
InterimFields = InterimFieldsField(
144
    'InterimFields',
145
    read_permission=View,
146
    write_permission=FieldEditAnalysisResult,
147
    schemata='Method',
148
    widget=RecordsWidget(
149
        label=_("Calculation Interim Fields"),
150
        description=_(
151
            "Values can be entered here which will override the defaults "
152
            "specified in the Calculation Interim Fields."),
153
    )
154
)
155
156
# Results Range that applies to this analysis
157
ResultsRange = ResultRangeField(
158
    "ResultsRange",
159
    required=0
160
)
161
162
schema = schema.copy() + Schema((
163
    AnalysisService,
164
    Analyst,
165
    Attachment,
166
    DetectionLimitOperand,
167
    # NumberOfRequiredVerifications overrides AbstractBaseClass
168
    NumberOfRequiredVerifications,
169
    Result,
170
    ResultCaptureDate,
171
    RetestOf,
172
    Uncertainty,
173
    Calculation,
174
    InterimFields,
175
    ResultsRange,
176
))
177
178
179
class AbstractAnalysis(AbstractBaseAnalysis):
180
    security = ClassSecurityInfo()
181
    displayContentsTab = False
182
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
183
184
    @deprecated('[1705] Currently returns the Analysis object itself.  If you '
185
                'need to get the service, use getAnalysisService instead')
186
    @security.public
187
    def getService(self):
188
        return self
189
190
    def getServiceUID(self):
191
        """Return the UID of the associated service.
192
        """
193
        return self.getRawAnalysisService()
194
195
    @security.public
196
    def getNumberOfVerifications(self):
197
        return len(self.getVerificators())
198
199
    @security.public
200
    def getNumberOfRemainingVerifications(self):
201
        required = self.getNumberOfRequiredVerifications()
202
        done = self.getNumberOfVerifications()
203
        if done >= required:
204
            return 0
205
        return required - done
206
207
    # TODO Workflow - analysis . Remove?
208
    @security.public
209
    def getLastVerificator(self):
210
        verifiers = self.getVerificators()
211
        return verifiers and verifiers[-1] or None
212
213
    @security.public
214
    def getVerificators(self):
215
        """Returns the user ids of the users that verified this analysis
216
        """
217
        verifiers = list()
218
        actions = ["verify", "multi_verify"]
219
        for event in wf.getReviewHistory(self):
220
            if event['action'] in actions:
221
                verifiers.append(event['actor'])
222
        sorted(verifiers, reverse=True)
223
        return verifiers
224
225
    @security.public
226
    def getDefaultUncertainty(self, result=None):
227
        """Return the uncertainty value, if the result falls within
228
        specified ranges for the service from which this analysis was derived.
229
        """
230
231
        if result is None:
232
            result = self.getResult()
233
234
        uncertainties = self.getUncertainties()
235
        if uncertainties:
236
            try:
237
                res = float(result)
238
            except (TypeError, ValueError):
239
                # if analysis result is not a number, then we assume in range
240
                return None
241
242
            for d in uncertainties:
243
                _min = float(d['intercept_min'])
244
                _max = float(d['intercept_max'])
245
                if _min <= res and res <= _max:
246
                    if str(d['errorvalue']).strip().endswith('%'):
247
                        try:
248
                            percvalue = float(d['errorvalue'].replace('%', ''))
249
                        except ValueError:
250
                            return None
251
                        uncertainty = res / 100 * percvalue
252
                    else:
253
                        uncertainty = float(d['errorvalue'])
254
255
                    return uncertainty
256
        return None
257
258
    @security.public
259
    def getUncertainty(self, result=None):
260
        """Returns the uncertainty for this analysis and result.
261
        Returns the value from Schema's Uncertainty field if the Service has
262
        the option 'Allow manual uncertainty'. Otherwise, do a callback to
263
        getDefaultUncertainty(). Returns None if no result specified and the
264
        current result for this analysis is below or above detections limits.
265
        """
266
        uncertainty = self.getField('Uncertainty').get(self)
267
        if result is None and (self.isAboveUpperDetectionLimit() or
268
                               self.isBelowLowerDetectionLimit()):
269
            return None
270
271
        if uncertainty and self.getAllowManualUncertainty() is True:
272
            try:
273
                uncertainty = float(uncertainty)
274
                return uncertainty
275
            except (TypeError, ValueError):
276
                # if uncertainty is not a number, return default value
277
                pass
278
        return self.getDefaultUncertainty(result)
279
280
    @security.public
281
    def setUncertainty(self, unc):
282
        """Sets the uncertainty for this analysis. If the result is a
283
        Detection Limit or the value is below LDL or upper UDL, sets the
284
        uncertainty value to 0
285
        """
286
        # Uncertainty calculation on DL
287
        # https://jira.bikalabs.com/browse/LIMS-1808
288
        if self.isAboveUpperDetectionLimit() or \
289
                self.isBelowLowerDetectionLimit():
290
            self.getField('Uncertainty').set(self, None)
291
        else:
292
            self.getField('Uncertainty').set(self, unc)
293
294
    @security.public
295
    def setDetectionLimitOperand(self, value):
296
        """Set detection limit operand for this analysis
297
        Allowed detection limit operands are `<` and `>`.
298
        """
299
        manual_dl = self.getAllowManualDetectionLimit()
300
        selector = self.getDetectionLimitSelector()
301
        if not manual_dl and not selector:
302
            # Don't allow the user to set the limit operand if manual assignment
303
            # is not allowed and selector is not visible
304
            return
305
306
        # Changing the detection limit operand has a side effect on the result
307
        result = self.getResult()
308
        if value in [LDL, UDL]:
309
            # flush uncertainty
310
            self.setUncertainty("")
311
312
            # If no previous result or user is not allowed to manually set the
313
            # the detection limit, override the result with default LDL/UDL
314
            has_result = api.is_floatable(result)
315
            if not has_result or not manual_dl:
316
                # set the result according to the system default UDL/LDL values
317
                if value == LDL:
318
                    result = self.getLowerDetectionLimit()
319
                else:
320
                    result = self.getUpperDetectionLimit()
321
322
        else:
323
            value = ""
324
            # Restore the DetectionLimitSelector, cause maybe its visibility
325
            # was changed because allow manual detection limit was enabled and
326
            # the user set a result with "<" or ">"
327
            if manual_dl:
328
                service = self.getAnalysisService()
329
                selector = service.getDetectionLimitSelector()
330
                self.setDetectionLimitSelector(selector)
331
332
        # Set the result
333
        self.getField("Result").set(self, result)
334
335
        # Set the detection limit to the field
336
        self.getField("DetectionLimitOperand").set(self, value)
337
338
    # Method getLowerDetectionLimit overrides method of class BaseAnalysis
339
    @security.public
340
    def getLowerDetectionLimit(self):
341
        """Returns the Lower Detection Limit (LDL) that applies to this
342
        analysis in particular. If no value set or the analysis service
343
        doesn't allow manual input of detection limits, returns the value set
344
        by default in the Analysis Service
345
        """
346
        if self.isLowerDetectionLimit():
347
            result = self.getResult()
348
            try:
349
                # in this case, the result itself is the LDL.
350
                return float(result)
351
            except (TypeError, ValueError):
352
                logger.warn("The result for the analysis %s is a lower "
353
                            "detection limit, but not floatable: '%s'. "
354
                            "Returnig AS's default LDL." %
355
                            (self.id, result))
356
        return AbstractBaseAnalysis.getLowerDetectionLimit(self)
357
358
    # Method getUpperDetectionLimit overrides method of class BaseAnalysis
359
    @security.public
360
    def getUpperDetectionLimit(self):
361
        """Returns the Upper Detection Limit (UDL) that applies to this
362
        analysis in particular. If no value set or the analysis service
363
        doesn't allow manual input of detection limits, returns the value set
364
        by default in the Analysis Service
365
        """
366
        if self.isUpperDetectionLimit():
367
            result = self.getResult()
368
            try:
369
                # in this case, the result itself is the LDL.
370
                return float(result)
371
            except (TypeError, ValueError):
372
                logger.warn("The result for the analysis %s is a lower "
373
                            "detection limit, but not floatable: '%s'. "
374
                            "Returnig AS's default LDL." %
375
                            (self.id, result))
376
        return AbstractBaseAnalysis.getUpperDetectionLimit(self)
377
378
    @security.public
379
    def isBelowLowerDetectionLimit(self):
380
        """Returns True if the result is below the Lower Detection Limit or
381
        if Lower Detection Limit has been manually set
382
        """
383
        if self.isLowerDetectionLimit():
384
            return True
385
386
        result = self.getResult()
387
        if result and str(result).strip().startswith(LDL):
388
            return True
389
390
        if api.is_floatable(result):
391
            return api.to_float(result) < self.getLowerDetectionLimit()
392
393
        return False
394
395
    @security.public
396
    def isAboveUpperDetectionLimit(self):
397
        """Returns True if the result is above the Upper Detection Limit or
398
        if Upper Detection Limit has been manually set
399
        """
400
        if self.isUpperDetectionLimit():
401
            return True
402
403
        result = self.getResult()
404
        if result and str(result).strip().startswith(UDL):
405
            return True
406
407
        if api.is_floatable(result):
408
            return api.to_float(result) > self.getUpperDetectionLimit()
409
410
        return False
411
412
    @security.public
413
    def getDetectionLimits(self):
414
        """Returns a two-value array with the limits of detection (LDL and
415
        UDL) that applies to this analysis in particular. If no value set or
416
        the analysis service doesn't allow manual input of detection limits,
417
        returns the value set by default in the Analysis Service
418
        """
419
        return [self.getLowerDetectionLimit(), self.getUpperDetectionLimit()]
420
421
    @security.public
422
    def isLowerDetectionLimit(self):
423
        """Returns True if the result for this analysis represents a Lower
424
        Detection Limit. Otherwise, returns False
425
        """
426
        return self.getDetectionLimitOperand() == LDL
427
428
    @security.public
429
    def isUpperDetectionLimit(self):
430
        """Returns True if the result for this analysis represents an Upper
431
        Detection Limit. Otherwise, returns False
432
        """
433
        return self.getDetectionLimitOperand() == UDL
434
435
    @security.public
436
    def getDependents(self):
437
        """Return a list of analyses who depend on us to calculate their result
438
        """
439
        raise NotImplementedError("getDependents is not implemented.")
440
441
    @security.public
442
    def getDependencies(self, with_retests=False):
443
        """Return a list of siblings who we depend on to calculate our result.
444
        :param with_retests: If false, siblings with retests are dismissed
445
        :type with_retests: bool
446
        :return: Analyses the current analysis depends on
447
        :rtype: list of IAnalysis
448
        """
449
        raise NotImplementedError("getDependencies is not implemented.")
450
451
    @security.public
452
    def setResult(self, value):
453
        """Validate and set a value into the Result field, taking into
454
        account the Detection Limits.
455
        :param value: is expected to be a string.
456
        """
457
        prev_result = self.getField("Result").get(self) or ""
458
459
        # Convert to list ff the analysis has result options set with multi
460
        if self.getResultOptions() and "multi" in self.getResultOptionsType():
461
            if not isinstance(value, (list, tuple)):
462
                value = filter(None, [value])
463
464
        # Handle list results
465
        if isinstance(value, (list, tuple)):
466
            value = json.dumps(value)
467
468
        # Ensure result integrity regards to None, empty and 0 values
469
        val = str("" if not value and value != 0 else value).strip()
470
471
        # UDL/LDL directly entered in the results field
472
        if val and val[0] in [LDL, UDL]:
473
            # Result prefixed with LDL/UDL
474
            oper = val[0]
475
            # Strip off LDL/UDL from the result
476
            val = val.replace(oper, "", 1)
477
            # Check if the value is indeterminate / non-floatable
478
            try:
479
                val = float(val)
480
            except (ValueError, TypeError):
481
                val = value
482
483
            # We dismiss the operand and the selector visibility unless the user
484
            # is allowed to manually set the detection limit or the DL selector
485
            # is visible.
486
            allow_manual = self.getAllowManualDetectionLimit()
487
            selector = self.getDetectionLimitSelector()
488
            if allow_manual or selector:
489
                # Ensure visibility of the detection limit selector
490
                self.setDetectionLimitSelector(True)
491
492
                # Set the detection limit operand
493
                self.setDetectionLimitOperand(oper)
494
495
                if not allow_manual:
496
                    # Override value by default DL
497
                    if oper == LDL:
498
                        val = self.getLowerDetectionLimit()
499
                    else:
500
                        val = self.getUpperDetectionLimit()
501
502
        # Update ResultCapture date if necessary
503
        if not val:
504
            self.setResultCaptureDate(None)
505
        elif prev_result != val:
506
            self.setResultCaptureDate(DateTime())
507
508
        # Set the result field
509
        self.getField("Result").set(self, val)
510
511
    @security.public
512
    def calculateResult(self, override=False, cascade=False):
513
        """Calculates the result for the current analysis if it depends of
514
        other analysis/interim fields. Otherwise, do nothing
515
        """
516
        if self.getResult() and override is False:
517
            return False
518
519
        calc = self.getCalculation()
520
        if not calc:
521
            return False
522
523
        # get the formula from the calculation
524
        formula = calc.getMinifiedFormula()
525
526
        # Include the current context UID in the mapping, so it can be passed
527
        # as a param in built-in functions, like 'get_result(%(context_uid)s)'
528
        mapping = {"context_uid": '"{}"'.format(self.UID())}
529
530
        # Interims' priority order (from low to high):
531
        # Calculation < Analysis
532
        interims = calc.getInterimFields() + self.getInterimFields()
533
534
        # Add interims to mapping
535
        for i in interims:
536
537
            interim_keyword = i.get("keyword")
538
            if not interim_keyword:
539
                continue
540
541
            # skip unset values
542
            interim_value = i.get("value", "")
543
            if interim_value == "":
544
                continue
545
546
            # Convert to floatable if necessary
547
            if api.is_floatable(interim_value):
548
                interim_value = float(interim_value)
549
550
            mapping[interim_keyword] = interim_value
551
552
        # Add dependencies results to mapping
553
        dependencies = self.getDependencies()
554
        for dependency in dependencies:
555
            result = dependency.getResult()
556
            # check if the dependency is a string result
557
            str_result = dependency.getStringResult()
558
            keyword = dependency.getKeyword()
559
            if not result:
560
                # Dependency without results found
561
                if cascade:
562
                    # Try to calculate the dependency result
563
                    dependency.calculateResult(override, cascade)
564
                    result = dependency.getResult()
565
                else:
566
                    return False
567
            if result:
568
                try:
569
                    # we need to quote a string result because of the `eval` below
570
                    result = '"%s"' % result if str_result else float(str(result))
571
                    key = dependency.getKeyword()
572
                    ldl = dependency.getLowerDetectionLimit()
573
                    udl = dependency.getUpperDetectionLimit()
574
                    bdl = dependency.isBelowLowerDetectionLimit()
575
                    adl = dependency.isAboveUpperDetectionLimit()
576
                    mapping[key] = result
577
                    mapping['%s.%s' % (key, 'RESULT')] = result
578
                    mapping['%s.%s' % (key, 'LDL')] = ldl
579
                    mapping['%s.%s' % (key, 'UDL')] = udl
580
                    mapping['%s.%s' % (key, 'BELOWLDL')] = int(bdl)
581
                    mapping['%s.%s' % (key, 'ABOVEUDL')] = int(adl)
582
                except (TypeError, ValueError):
583
                    return False
584
585
                # replace placeholder -> formatting string
586
                # https://docs.python.org/2.7/library/stdtypes.html?highlight=built#string-formatting-operations
587
                converter = "s" if str_result else "f"
588
                formula = formula.replace("[" + keyword + "]", "%(" + keyword + ")" + converter)
589
590
591
        # convert any remaining placeholders, e.g. from interims etc.
592
        # NOTE: we assume remaining values are all floatable!
593
        formula = formula.replace("[", "%(").replace("]", ")f")
594
595
        # Calculate
596
        try:
597
            formula = eval("'%s'%%mapping" % formula,
598
                           {"__builtins__": None,
599
                            'math': math,
600
                            'context': self},
601
                           {'mapping': mapping})
602
            result = eval(formula, calc._getGlobals())
603
        except TypeError:
604
            self.setResult("NA")
605
            return True
606
        except ZeroDivisionError:
607
            self.setResult('0/0')
608
            return True
609
        except KeyError:
610
            self.setResult("NA")
611
            return True
612
        except ImportError:
613
            self.setResult("NA")
614
            return True
615
616
        self.setResult(str(result))
617
        return True
618
619
    @security.public
620
    def getPrice(self):
621
        """The function obtains the analysis' price without VAT and without
622
        member discount
623
        :return: the price (without VAT or Member Discount) in decimal format
624
        """
625
        analysis_request = self.aq_parent
626
        client = analysis_request.aq_parent
627
        if client.getBulkDiscount():
628
            price = self.getBulkPrice()
629
        else:
630
            price = self.getField('Price').get(self)
631
        return price
632
633
    @security.public
634
    def getVATAmount(self):
635
        """Compute the VAT amount without member discount.
636
        :return: the result as a float
637
        """
638
        vat = self.getVAT()
639
        price = self.getPrice()
640
        return Decimal(price) * Decimal(vat) / 100
641
642
    @security.public
643
    def getTotalPrice(self):
644
        """Obtain the total price without client's member discount. The function
645
        keeps in mind the client's bulk discount.
646
        :return: the result as a float
647
        """
648
        return Decimal(self.getPrice()) + Decimal(self.getVATAmount())
649
650
    @security.public
651
    def getDuration(self):
652
        """Returns the time in minutes taken for this analysis.
653
        If the analysis is not yet 'ready to process', returns 0
654
        If the analysis is still in progress (not yet verified),
655
            duration = date_verified - date_start_process
656
        Otherwise:
657
            duration = current_datetime - date_start_process
658
        :return: time in minutes taken for this analysis
659
        :rtype: int
660
        """
661
        starttime = self.getStartProcessDate()
662
        if not starttime:
663
            # The analysis is not yet ready to be processed
664
            return 0
665
        endtime = self.getDateVerified() or DateTime()
666
667
        # Duration in minutes
668
        duration = (endtime - starttime) * 24 * 60
669
        return duration
670
671
    @security.public
672
    def getEarliness(self):
673
        """The remaining time in minutes for this analysis to be completed.
674
        Returns zero if the analysis is neither 'ready to process' nor a
675
        turnaround time is set.
676
            earliness = duration - max_turnaround_time
677
        The analysis is late if the earliness is negative
678
        :return: the remaining time in minutes before the analysis reaches TAT
679
        :rtype: int
680
        """
681
        maxtime = self.getMaxTimeAllowed()
682
        if not maxtime:
683
            # No Turnaround time is set for this analysis
684
            return 0
685
        return api.to_minutes(**maxtime) - self.getDuration()
686
687
    @security.public
688
    def isLateAnalysis(self):
689
        """Returns true if the analysis is late in accordance with the maximum
690
        turnaround time. If no maximum turnaround time is set for this analysis
691
        or it is not yet ready to be processed, or there is still time
692
        remaining (earliness), returns False.
693
        :return: true if the analysis is late
694
        :rtype: bool
695
        """
696
        return self.getEarliness() < 0
697
698
    @security.public
699
    def getLateness(self):
700
        """The time in minutes that exceeds the maximum turnaround set for this
701
        analysis. If the analysis has no turnaround time set or is not ready
702
        for process yet, returns 0. The analysis is not late if the lateness is
703
        negative
704
        :return: the time in minutes that exceeds the maximum turnaround time
705
        :rtype: int
706
        """
707
        return -self.getEarliness()
708
709
    @security.public
710
    def isInstrumentAllowed(self, instrument):
711
        """Checks if the specified instrument can be set for this analysis,
712
713
        :param instrument: string,Instrument
714
        :return: True if the assignment of the passed in instrument is allowed
715
        :rtype: bool
716
        """
717
        uid = api.get_uid(instrument)
718
        return uid in map(api.get_uid, self.getAllowedInstruments())
719
720
    @security.public
721
    def isMethodAllowed(self, method):
722
        """Checks if the analysis can follow the method specified
723
724
        :param method: string,Method
725
        :return: True if the analysis can follow the method specified
726
        :rtype: bool
727
        """
728
        uid = api.get_uid(method)
729
        return uid in map(api.get_uid, self.getAllowedMethods())
730
731
    @security.public
732
    def getAllowedMethods(self):
733
        """Returns the allowed methods for this analysis, either if the method
734
        was assigned directly (by using "Allows manual entry of results") or
735
        indirectly via Instrument ("Allows instrument entry of results") in
736
        Analysis Service Edit View.
737
        :return: A list with the methods allowed for this analysis
738
        :rtype: list of Methods
739
        """
740
        service = self.getAnalysisService()
741
        if not service:
742
            return []
743
        # get the available methods of the service
744
        return service.getMethods()
745
746
    @security.public
747
    def getAllowedInstruments(self):
748
        """Returns the allowed instruments from the service
749
750
        :return: A list of instruments allowed for this Analysis
751
        :rtype: list of instruments
752
        """
753
        service = self.getAnalysisService()
754
        if not service:
755
            return []
756
        return service.getInstruments()
757
758
    @security.public
759
    def getExponentialFormatPrecision(self, result=None):
760
        """ Returns the precision for the Analysis Service and result
761
        provided. Results with a precision value above this exponential
762
        format precision should be formatted as scientific notation.
763
764
        If the Calculate Precision according to Uncertainty is not set,
765
        the method will return the exponential precision value set in the
766
        Schema. Otherwise, will calculate the precision value according to
767
        the Uncertainty and the result.
768
769
        If Calculate Precision from the Uncertainty is set but no result
770
        provided neither uncertainty values are set, returns the fixed
771
        exponential precision.
772
773
        Will return positive values if the result is below 0 and will return
774
        0 or positive values if the result is above 0.
775
776
        Given an analysis service with fixed exponential format
777
        precision of 4:
778
        Result      Uncertainty     Returns
779
        5.234           0.22           0
780
        13.5            1.34           1
781
        0.0077          0.008         -3
782
        32092           0.81           4
783
        456021          423            5
784
785
        For further details, visit https://jira.bikalabs.com/browse/LIMS-1334
786
787
        :param result: if provided and "Calculate Precision according to the
788
        Uncertainty" is set, the result will be used to retrieve the
789
        uncertainty from which the precision must be calculated. Otherwise,
790
        the fixed-precision will be used.
791
        :returns: the precision
792
        """
793
        if not result or self.getPrecisionFromUncertainty() is False:
794
            return self._getExponentialFormatPrecision()
795
        else:
796
            uncertainty = self.getUncertainty(result)
797
            if uncertainty is None:
798
                return self._getExponentialFormatPrecision()
799
800
            try:
801
                float(result)
802
            except ValueError:
803
                # if analysis result is not a number, then we assume in range
804
                return self._getExponentialFormatPrecision()
805
806
            return get_significant_digits(uncertainty)
807
808
    def _getExponentialFormatPrecision(self):
809
        field = self.getField('ExponentialFormatPrecision')
810
        value = field.get(self)
811
        if value is None:
812
            # https://github.com/bikalims/bika.lims/issues/2004
813
            # We require the field, because None values make no sense at all.
814
            value = self.Schema().getField(
815
                'ExponentialFormatPrecision').getDefault(self)
816
        return value
817
818
    @security.public
819
    def getFormattedResult(self, specs=None, decimalmark='.', sciformat=1,
820
                           html=True):
821
        """Formatted result:
822
        1. If the result is a detection limit, returns '< LDL' or '> UDL'
823
        2. Print ResultText of matching ResultOptions
824
        3. If the result is not floatable, return it without being formatted
825
        4. If the analysis specs has hidemin or hidemax enabled and the
826
           result is out of range, render result as '<min' or '>max'
827
        5. If the result is below Lower Detection Limit, show '<LDL'
828
        6. If the result is above Upper Detecion Limit, show '>UDL'
829
        7. Otherwise, render numerical value
830
        :param specs: Optional result specifications, a dictionary as follows:
831
            {'min': <min_val>,
832
             'max': <max_val>,
833
             'error': <error>,
834
             'hidemin': <hidemin_val>,
835
             'hidemax': <hidemax_val>}
836
        :param decimalmark: The string to be used as a decimal separator.
837
            default is '.'
838
        :param sciformat: 1. The sci notation has to be formatted as aE^+b
839
                          2. The sci notation has to be formatted as a·10^b
840
                          3. As 2, but with super html entity for exp
841
                          4. The sci notation has to be formatted as a·10^b
842
                          5. As 4, but with super html entity for exp
843
                          By default 1
844
        :param html: if true, returns an string with the special characters
845
            escaped: e.g: '<' and '>' (LDL and UDL for results like < 23.4).
846
        """
847
        result = self.getResult()
848
849
        # 1. The result is a detection limit, return '< LDL' or '> UDL'
850
        dl = self.getDetectionLimitOperand()
851
        if dl:
852
            try:
853
                res = float(result)  # required, check if floatable
854
                res = drop_trailing_zeros_decimal(res)
855
                fdm = formatDecimalMark(res, decimalmark)
856
                hdl = cgi.escape(dl) if html else dl
857
                return '%s %s' % (hdl, fdm)
858
            except (TypeError, ValueError):
859
                logger.warn(
860
                    "The result for the analysis %s is a detection limit, "
861
                    "but not floatable: %s" % (self.id, result))
862
                return formatDecimalMark(result, decimalmark=decimalmark)
863
864
        # 2. Print ResultText of matching ResultOptions
865
        choices = self.getResultOptions()
866
        if choices:
867
            # Create a dict for easy mapping of result options
868
            values_texts = dict(map(
869
                lambda c: (str(c["ResultValue"]), c["ResultText"]), choices
870
            ))
871
872
            # Result might contain a single result option
873
            match = values_texts.get(str(result))
874
            if match:
875
                return match
876
877
            # Result might be a string with multiple options e.g. "['2', '1']"
878
            try:
879
                raw_result = json.loads(result)
880
                texts = map(lambda r: values_texts.get(str(r)), raw_result)
0 ignored issues
show
introduced by
The variable values_texts does not seem to be defined in case choices on line 866 is False. Are you sure this can never be the case?
Loading history...
881
                texts = filter(None, texts)
882
                return "<br/>".join(texts)
883
            except (ValueError, TypeError):
884
                pass
885
886
        # 3. If the result is not floatable, return it without being formatted
887
        try:
888
            result = float(result)
889
        except (TypeError, ValueError):
890
            return formatDecimalMark(result, decimalmark=decimalmark)
891
892
        # 4. If the analysis specs has enabled hidemin or hidemax and the
893
        #    result is out of range, render result as '<min' or '>max'
894
        specs = specs if specs else self.getResultsRange()
895
        hidemin = specs.get('hidemin', '')
896
        hidemax = specs.get('hidemax', '')
897
        try:
898
            belowmin = hidemin and result < float(hidemin) or False
899
        except (TypeError, ValueError):
900
            belowmin = False
901
        try:
902
            abovemax = hidemax and result > float(hidemax) or False
903
        except (TypeError, ValueError):
904
            abovemax = False
905
906
        # 4.1. If result is below min and hidemin enabled, return '<min'
907
        if belowmin:
908
            fdm = formatDecimalMark('< %s' % hidemin, decimalmark)
909
            return fdm.replace('< ', '&lt; ', 1) if html else fdm
910
911
        # 4.2. If result is above max and hidemax enabled, return '>max'
912
        if abovemax:
913
            fdm = formatDecimalMark('> %s' % hidemax, decimalmark)
914
            return fdm.replace('> ', '&gt; ', 1) if html else fdm
915
916
        # Below Lower Detection Limit (LDL)?
917
        ldl = self.getLowerDetectionLimit()
918
        if result < ldl:
919
            # LDL must not be formatted according to precision, etc.
920
            # Drop trailing zeros from decimal
921
            ldl = drop_trailing_zeros_decimal(ldl)
922
            fdm = formatDecimalMark('< %s' % ldl, decimalmark)
923
            return fdm.replace('< ', '&lt; ', 1) if html else fdm
924
925
        # Above Upper Detection Limit (UDL)?
926
        udl = self.getUpperDetectionLimit()
927
        if result > udl:
928
            # UDL must not be formatted according to precision, etc.
929
            # Drop trailing zeros from decimal
930
            udl = drop_trailing_zeros_decimal(udl)
931
            fdm = formatDecimalMark('> %s' % udl, decimalmark)
932
            return fdm.replace('> ', '&gt; ', 1) if html else fdm
933
934
        # Render numerical values
935
        return format_numeric_result(self, self.getResult(),
936
                                     decimalmark=decimalmark,
937
                                     sciformat=sciformat)
938
939
    @security.public
940
    def getPrecision(self, result=None):
941
        """Returns the precision for the Analysis.
942
943
        - If ManualUncertainty is set, calculates the precision of the result
944
          in accordance with the manual uncertainty set.
945
946
        - If Calculate Precision from Uncertainty is set in Analysis Service,
947
          calculates the precision in accordance with the uncertainty infered
948
          from uncertainties ranges.
949
950
        - If neither Manual Uncertainty nor Calculate Precision from
951
          Uncertainty are set, returns the precision from the Analysis Service
952
953
        - If you have a number with zero uncertainty: If you roll a pair of
954
        dice and observe five spots, the number of spots is 5. This is a raw
955
        data point, with no uncertainty whatsoever. So just write down the
956
        number. Similarly, the number of centimeters per inch is 2.54,
957
        by definition, with no uncertainty whatsoever. Again: just write
958
        down the number.
959
960
        Further information at AbstractBaseAnalysis.getPrecision()
961
        """
962
        allow_manual = self.getAllowManualUncertainty()
963
        precision_unc = self.getPrecisionFromUncertainty()
964
        if allow_manual or precision_unc:
965
            uncertainty = self.getUncertainty(result)
966
            if uncertainty is None:
967
                return self.getField('Precision').get(self)
968
            if uncertainty == 0 and result is None:
969
                return self.getField('Precision').get(self)
970
            if uncertainty == 0:
971
                strres = str(result)
972
                numdecimals = strres[::-1].find('.')
973
                return numdecimals
974
            return get_significant_digits(uncertainty)
975
        return self.getField('Precision').get(self)
976
977
    @security.public
978
    def getAnalyst(self):
979
        """Returns the stored Analyst or the user who submitted the result
980
        """
981
        analyst = self.getField("Analyst").get(self) or self.getAssignedAnalyst()
982
        if not analyst:
983
            analyst = self.getSubmittedBy()
984
        return analyst or ""
985
986
    @security.public
987
    def getAssignedAnalyst(self):
988
        """Returns the Analyst assigned to the worksheet this
989
        analysis is assigned to
990
        """
991
        worksheet = self.getWorksheet()
992
        if not worksheet:
993
            return ""
994
        return worksheet.getAnalyst() or ""
995
996
    @security.public
997
    def getSubmittedBy(self):
998
        """
999
        Returns the identifier of the user who submitted the result if the
1000
        state of the current analysis is "to_be_verified" or "verified"
1001
        :return: the user_id of the user who did the last submission of result
1002
        """
1003
        return getTransitionActor(self, 'submit')
1004
1005
    @security.public
1006
    def getDateSubmitted(self):
1007
        """Returns the time the result was submitted.
1008
        :return: a DateTime object.
1009
        :rtype: DateTime
1010
        """
1011
        return getTransitionDate(self, 'submit', return_as_datetime=True)
1012
1013
    @security.public
1014
    def getDateVerified(self):
1015
        """Returns the time the analysis was verified. If the analysis hasn't
1016
        been yet verified, returns None
1017
        :return: the time the analysis was verified or None
1018
        :rtype: DateTime
1019
        """
1020
        return getTransitionDate(self, 'verify', return_as_datetime=True)
1021
1022
    @security.public
1023
    def getStartProcessDate(self):
1024
        """Returns the date time when the analysis is ready to be processed.
1025
        It returns the datetime when the object was created, but might be
1026
        different depending on the type of analysis (e.g. "Date Received" for
1027
        routine analyses): see overriden functions.
1028
        :return: Date time when the analysis is ready to be processed.
1029
        :rtype: DateTime
1030
        """
1031
        return self.created()
1032
1033
    @security.public
1034
    def getParentURL(self):
1035
        """This method is used to populate catalog values
1036
        This function returns the analysis' parent URL
1037
        """
1038
        parent = self.aq_parent
1039
        if parent:
1040
            return parent.absolute_url_path()
1041
1042
    @security.public
1043
    def getWorksheetUID(self):
1044
        """This method is used to populate catalog values
1045
        Returns WS UID if this analysis is assigned to a worksheet, or None.
1046
        """
1047
        uids = get_backreferences(self, relationship="WorksheetAnalysis")
1048
        if not uids:
1049
            return None
1050
1051
        if len(uids) > 1:
1052
            path = api.get_path(self)
1053
            logger.error("More than one worksheet: {}".format(path))
1054
            return None
1055
1056
        return uids[0]
1057
1058
    @security.public
1059
    def getWorksheet(self):
1060
        """Returns the Worksheet to which this analysis belongs to, or None
1061
        """
1062
        worksheet_uid = self.getWorksheetUID()
1063
        return api.get_object_by_uid(worksheet_uid, None)
1064
1065
    @security.public
1066
    def remove_duplicates(self, ws):
1067
        """When this analysis is unassigned from a worksheet, this function
1068
        is responsible for deleting DuplicateAnalysis objects from the ws.
1069
        """
1070
        for analysis in ws.objectValues():
1071
            if IDuplicateAnalysis.providedBy(analysis) \
1072
                    and analysis.getAnalysis().UID() == self.UID():
1073
                ws.removeAnalysis(analysis)
1074
1075
    def setInterimValue(self, keyword, value):
1076
        """Sets a value to an interim of this analysis
1077
        :param keyword: the keyword of the interim
1078
        :param value: the value for the interim
1079
        """
1080
        # Ensure value format integrity
1081
        if value is None:
1082
            value = ""
1083
        elif isinstance(value, string_types):
1084
            value = value.strip()
1085
        elif isinstance(value, (list, tuple, set, dict)):
1086
            value = json.dumps(value)
1087
1088
        # Ensure result integrity regards to None, empty and 0 values
1089
        interims = copy.deepcopy(self.getInterimFields())
1090
        for interim in interims:
1091
            if interim.get("keyword") == keyword:
1092
                interim["value"] = str(value)
1093
        self.setInterimFields(interims)
1094
1095
    def getInterimValue(self, keyword):
1096
        """Returns the value of an interim of this analysis
1097
        """
1098
        interims = filter(lambda item: item["keyword"] == keyword,
1099
                          self.getInterimFields())
1100
        if not interims:
1101
            logger.warning("Interim '{}' for analysis '{}' not found"
1102
                           .format(keyword, self.getKeyword()))
1103
            return None
1104
        if len(interims) > 1:
1105
            logger.error("More than one interim '{}' found for '{}'"
1106
                         .format(keyword, self.getKeyword()))
1107
            return None
1108
        return interims[0].get('value', '')
1109
1110
    def isRetest(self):
1111
        """Returns whether this analysis is a retest or not
1112
        """
1113
        return self.getRetestOf() and True or False
1114
1115
    def getRetestOfUID(self):
1116
        """Returns the UID of the retracted analysis this is a retest of
1117
        """
1118
        retest_of = self.getRetestOf()
1119
        if retest_of:
1120
            return api.get_uid(retest_of)
1121
1122
    def getRetest(self):
1123
        """Returns the retest that comes from this analysis, if any
1124
        """
1125
        relationship = "{}RetestOf".format(self.portal_type)
1126
        back_refs = get_backreferences(self, relationship)
1127
        if not back_refs:
1128
            return None
1129
        if len(back_refs) > 1:
1130
            logger.warn("Analysis {} with multiple retests".format(self.id))
1131
        retest_uid = back_refs[0]
1132
        retest = api.get_object_by_uid(retest_uid, default=None)
1133
        if retest is None:
1134
            logger.error("Retest with UID {} not found".format(retest_uid))
1135
        return retest
1136