Passed
Push — 2.x ( 3a5aed...187ea7 )
by Jordi
07:51
created

AbstractRoutineAnalysis.getDependents()   A

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 15
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import copy
22
from collections import OrderedDict
23
from datetime import timedelta
24
25
from AccessControl import ClassSecurityInfo
26
from bika.lims import api
27
from bika.lims import bikaMessageFactory as _
28
from bika.lims.api.analysis import get_dependencies
29
from bika.lims.api.analysis import get_dependents
30
from bika.lims.browser.widgets import DecimalWidget
31
from bika.lims.content.abstractanalysis import AbstractAnalysis
32
from bika.lims.content.abstractanalysis import schema
33
from bika.lims.content.attachment import Attachment
34
from bika.lims.content.clientawaremixin import ClientAwareMixin
35
from bika.lims.interfaces import IAnalysis
36
from bika.lims.interfaces import ICancellable
37
from bika.lims.interfaces import IDynamicResultsRange
38
from bika.lims.interfaces import IInternalUse
39
from bika.lims.interfaces import IRoutineAnalysis
40
from bika.lims.interfaces.analysis import IRequestAnalysis
41
from bika.lims.workflow import getTransitionDate
42
from Products.Archetypes.Field import BooleanField
43
from Products.Archetypes.Field import StringField
44
from Products.Archetypes.Schema import Schema
45
from Products.ATContentTypes.utils import DT2dt
46
from Products.ATContentTypes.utils import dt2DT
47
from Products.CMFCore.permissions import View
48
from senaite.core.catalog.indexer.baseanalysis import sortable_title
49
from senaite.core.permissions import FieldEditAnalysisResult
50
from zope.interface import alsoProvides
51
from zope.interface import implements
52
from zope.interface import noLongerProvides
53
54
# The actual uncertainty for this analysis' result, populated when the result
55
# is submitted.
56
Uncertainty = StringField(
57
    "Uncertainty",
58
    read_permission=View,
59
    write_permission=FieldEditAnalysisResult,
60
    precision=10,
61
    widget=DecimalWidget(
62
        label=_("Uncertainty")
63
    )
64
)
65
# This field keep track if the field hidden has been set manually or not. If
66
# this value is false, the system will assume the visibility of this analysis
67
# in results report will depend on the value set at AR, Profile or Template
68
# levels (see AnalysisServiceSettings fields in AR). If the value for this
69
# field is set to true, the system will assume the visibility of the analysis
70
# will only depend on the value set for the field Hidden (bool).
71
HiddenManually = BooleanField(
72
    'HiddenManually',
73
    default=False,
74
)
75
76
77
schema = schema.copy() + Schema((
78
    Uncertainty,
79
    HiddenManually,
80
))
81
82
83
class AbstractRoutineAnalysis(AbstractAnalysis, ClientAwareMixin):
84
    implements(IAnalysis, IRequestAnalysis, IRoutineAnalysis, ICancellable)
85
    security = ClassSecurityInfo()
86
    displayContentsTab = False
87
    schema = schema
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable schema does not seem to be defined.
Loading history...
88
89
    @security.public
90
    def getRequest(self):
91
        """Returns the Analysis Request this analysis belongs to.
92
        Delegates to self.aq_parent
93
        """
94
        ar = self.aq_parent
95
        return ar
96
97
    @security.public
98
    def getRequestID(self):
99
        """Used to populate catalog values.
100
        Returns the ID of the parent analysis request.
101
        """
102
        ar = self.getRequest()
103
        if ar:
104
            return ar.getId()
105
106
    @security.public
107
    def getRequestUID(self):
108
        """Returns the UID of the parent analysis request.
109
        """
110
        ar = self.getRequest()
111
        if ar:
112
            return ar.UID()
113
114
    @security.public
115
    def getRequestURL(self):
116
        """Returns the url path of the Analysis Request object this analysis
117
        belongs to. Returns None if there is no Request assigned.
118
        :return: the Analysis Request URL path this analysis belongs to
119
        :rtype: str
120
        """
121
        request = self.getRequest()
122
        if request:
123
            return request.absolute_url_path()
124
125
    def getClient(self):
126
        """Returns the Client this analysis is bound to, if any
127
        """
128
        request = self.getRequest()
129
        return request and request.getClient() or None
130
131
    @security.public
132
    def getClientOrderNumber(self):
133
        """Used to populate catalog values.
134
        Returns the ClientOrderNumber of the associated AR
135
        """
136
        request = self.getRequest()
137
        if request:
138
            return request.getClientOrderNumber()
139
140
    @security.public
141
    def getDateReceived(self):
142
        """Used to populate catalog values.
143
        Returns the date the Analysis Request this analysis belongs to was
144
        received. If the analysis was created after, then returns the date
145
        the analysis was created.
146
        """
147
        request = self.getRequest()
148
        if request:
149
            ar_date = request.getDateReceived()
150
            if ar_date and self.created() > ar_date:
151
                return self.created()
152
            return ar_date
153
        return None
154
155
    @security.public
156
    def isSampleReceived(self):
157
        """Returns whether if the Analysis Request this analysis comes from has
158
        been received or not
159
        """
160
        sample = self.getRequest()
161
        if sample.getDateReceived():
162
            return True
163
        return False
164
165
    @security.public
166
    def getDatePublished(self):
167
        """Used to populate catalog values.
168
        Returns the date on which the "publish" transition was invoked on this
169
        analysis.
170
        """
171
        return getTransitionDate(self, 'publish', return_as_datetime=True)
172
173
    @security.public
174
    def getDateSampled(self):
175
        """Returns the date when the Sample was Sampled
176
        """
177
        request = self.getRequest()
178
        if request:
179
            return request.getDateSampled()
180
        return None
181
182
    @security.public
183
    def isSampleSampled(self):
184
        """Returns whether if the Analysis Request this analysis comes from has
185
        been received or not
186
        """
187
        return self.getDateSampled() and True or False
188
189
    @security.public
190
    def getStartProcessDate(self):
191
        """Returns the date time when the analysis request the analysis belongs
192
        to was received. If the analysis request hasn't yet been received,
193
        returns None
194
        Overrides getStartProcessDateTime from the base class
195
        :return: Date time when the analysis is ready to be processed.
196
        :rtype: DateTime
197
        """
198
        return self.getDateReceived()
199
200
    @security.public
201
    def getSamplePoint(self):
202
        request = self.getRequest()
203
        if request:
204
            return request.getSamplePoint()
205
        return None
206
207
    @security.public
208
    def getDueDate(self):
209
        """Used to populate getDueDate index and metadata.
210
        This calculates the difference between the time the analysis processing
211
        started and the maximum turnaround time. If the analysis has no
212
        turnaround time set or is not yet ready for proces, returns None
213
        """
214
        tat = self.getMaxTimeAllowed()
215
        if not tat:
216
            return None
217
        if api.to_minutes(**tat) == 0:
218
            return None
219
        start = self.getStartProcessDate()
220
        if not start:
221
            return None
222
223
        # delta time when the first analysis is considered as late
224
        delta = timedelta(minutes=api.to_minutes(**tat))
225
226
        # calculated due date
227
        end = dt2DT(DT2dt(start) + delta)
228
229
        # delta is within one day, return immediately
230
        if delta.days == 0:
231
            return end
232
233
        # get the laboratory workdays
234
        setup = api.get_setup()
235
        workdays = setup.getWorkdays()
236
237
        # every day is a workday, no need for calculation
238
        if workdays == tuple(map(str, range(7))):
239
            return end
240
241
        # reset the due date to the received date, and add only for configured
242
        # workdays another day
243
        due_date = end - delta.days
244
245
        days = 0
246
        while days < delta.days:
247
            # add one day to the new due date
248
            due_date += 1
249
            # skip if the weekday is a non working day
250
            if str(due_date.asdatetime().weekday()) not in workdays:
251
                continue
252
            days += 1
253
254
        return due_date
255
256
    @security.public
257
    def getSampleType(self):
258
        request = self.getRequest()
259
        if request:
260
            return request.getSampleType()
261
        return None
262
263
    @security.public
264
    def getSampleTypeUID(self):
265
        """Used to populate catalog values.
266
        """
267
        sample = self.getRequest()
268
        if not sample:
269
            return None
270
        return sample.getRawSampleType()
271
272
    @security.public
273
    def getResultsRange(self):
274
        """Returns the valid result range for this routine analysis
275
276
        A routine analysis will be considered out of range if it result falls
277
        out of the range defined in "min" and "max". If there are values set
278
        for "warn_min" and "warn_max", these are used to compute the shoulders
279
        in both ends of the range. Thus, an analysis can be out of range, but
280
        be within shoulders still.
281
282
        :return: A dictionary with keys "min", "max", "warn_min" and "warn_max"
283
        :rtype: dict
284
        """
285
        return self.getField("ResultsRange").get(self)
286
287
    @security.private
288
    def setResultsRange(self, spec, update_dynamic_spec=True):
289
        """Set the results range for this routine analysis
290
291
        NOTE: This custom setter also applies dynamic specifications
292
293
        :param spec: The result range to set
294
        :param update_dynamic_spec: If True, dynamic specifications are update
295
        """
296
        adapter = IDynamicResultsRange(self, None)
297
        if adapter and update_dynamic_spec:
298
            # update the result range with the dynamic values
299
            spec.update(adapter())
300
        field = self.getField("ResultsRange")
301
        field.set(self, spec)
302
303
    @security.public
304
    def getSiblings(self, with_retests=False):
305
        """
306
        Return the siblings analyses, using the parent to which the current
307
        analysis belongs to as the source
308
        :param with_retests: If false, siblings with retests are dismissed
309
        :type with_retests: bool
310
        :return: list of siblings for this analysis
311
        :rtype: list of IAnalysis
312
        """
313
        raise NotImplementedError("getSiblings is not implemented.")
314
315
    @security.public
316
    def getCalculation(self):
317
        """Return current assigned calculation
318
        """
319
        field = self.getField("Calculation")
320
        calculation = field.get(self)
321
        if not calculation:
322
            return None
323
        return calculation
324
325
    @security.public
326
    def setCalculation(self, value):
327
        self.getField("Calculation").set(self, value)
328
        # TODO Something weird here
329
        # Reset interims so they get extended with those from calculation
330
        # see bika.lims.browser.fields.interimfieldsfield.set
331
        interim_fields = copy.deepcopy(self.getInterimFields())
332
        self.setInterimFields(interim_fields)
333
334
    @security.public
335
    def getDependents(self, with_retests=False, recursive=False):
336
        """Returns a list of analyses who depend on us to calculate their result.
337
338
        I.e. calculated analyses that contain our keyword in their formula.
339
340
        :param with_retests: If false, dependents with retests are dismissed
341
        :param recursive: If true, returns all dependents recursively down
342
        :type with_retests: bool
343
        :return: Analyses the current analysis depends on
344
        :rtype: list of IAnalysis
345
        """
346
        dependents = get_dependents(
347
            self, with_retests=with_retests, recursive=recursive)
348
        return map(api.get_object, dependents)
349
350
    @security.public
351
    def getDependencies(self, with_retests=False, recursive=False):
352
        """Return a list of analyses who we depend on to calculate our result.
353
354
        :param with_retests: If false, siblings with retests are dismissed
355
        :param recursive: If true, looks for dependencies recursively up
356
        :type with_retests: bool
357
        :return: Analyses the current analysis depends on
358
        :rtype: list of IAnalysis
359
        """
360
        dependencies = get_dependencies(
361
            self, with_retests=with_retests, recursive=recursive)
362
        return map(api.get_object, dependencies)
363
364
    @security.public
365
    def getPrioritySortkey(self):
366
        """
367
        Returns the key that will be used to sort the current Analysis, from
368
        most prioritary to less prioritary.
369
        :return: string used for sorting
370
        """
371
        analysis_request = self.getRequest()
372
        if analysis_request is None:
373
            return None
374
        ar_sort_key = analysis_request.getPrioritySortkey()
375
        ar_id = analysis_request.getId().lower()
376
        title = sortable_title(self)
377
        if callable(title):
378
            title = title()
379
        return '{}.{}.{}'.format(ar_sort_key, ar_id, title)
380
381
    @security.public
382
    def getHidden(self):
383
        """ Returns whether if the analysis must be displayed in results
384
        reports or not, as well as in analyses view when the user logged in
385
        is a Client Contact.
386
387
        If the value for the field HiddenManually is set to False, this function
388
        will delegate the action to the method getAnalysisServiceSettings() from
389
        the Analysis Request.
390
391
        If the value for the field HiddenManually is set to True, this function
392
        will return the value of the field Hidden.
393
        :return: true or false
394
        :rtype: bool
395
        """
396
        if self.getHiddenManually():
397
            return self.getField('Hidden').get(self)
398
        request = self.getRequest()
399
        if request:
400
            service_uid = self.getServiceUID()
401
            ar_settings = request.getAnalysisServiceSettings(service_uid)
402
            return ar_settings.get('hidden', False)
403
        return False
404
405
    @security.public
406
    def setHidden(self, hidden):
407
        """ Sets if this analysis must be displayed or not in results report and
408
        in manage analyses view if the user is a lab contact as well.
409
410
        The value set by using this field will have priority over the visibility
411
        criteria set at Analysis Request, Template or Profile levels (see
412
        field AnalysisServiceSettings from Analysis Request. To achieve this
413
        behavior, this setter also sets the value to HiddenManually to true.
414
        :param hidden: true if the analysis must be hidden in report
415
        :type hidden: bool
416
        """
417
        self.setHiddenManually(True)
418
        self.getField('Hidden').set(self, hidden)
419
420
    @security.public
421
    def setInternalUse(self, internal_use):
422
        """Applies the internal use of this Analysis. Analyses set for internal
423
        use are not accessible to clients and are not visible in reports
424
        """
425
        if internal_use:
426
            alsoProvides(self, IInternalUse)
427
        else:
428
            noLongerProvides(self, IInternalUse)
429
430
    def getConditions(self, empties=False):
431
        """Returns the conditions of this analysis. These conditions are usually
432
        set on sample registration and are stored at sample level. Do not return
433
        conditions with empty value unless `empties` is True
434
        """
435
        sample = self.getRequest()
436
        service_uid = self.getRawAnalysisService()
437
        attachments = self.getRawAttachment()
438
439
        def is_valid(condition):
440
            uid = condition.get("uid")
441
            if api.is_uid(uid) and uid == service_uid:
442
                if empties:
443
                    return True
444
                value = condition.get("value", None)
445
                if condition.get("type") == "file":
446
                    return value in attachments
447
                return value not in [None, ""]
448
            return False
449
450
        conditions = sample.getServiceConditions()
451
        conditions = filter(is_valid, conditions)
452
        return copy.deepcopy(conditions)
453
454
    def setConditions(self, conditions):
455
        """Sets the conditions of this analysis. These conditions are usually
456
        set on sample registration and are stored at sample level
457
        """
458
        if not conditions:
459
            conditions = []
460
461
        sample = self.getRequest()
462
        service_uid = self.getRawAnalysisService()
463
        sample_conditions = sample.getServiceConditions()
464
        sample_conditions = copy.deepcopy(sample_conditions)
465
466
        # Keep the conditions from sample for analyses other than this one
467
        other_conditions = filter(lambda c: c.get("uid") != service_uid,
468
                                  sample_conditions)
469
470
        def to_condition(condition):
471
            # Type and title are required
472
            title = condition.get("title")
473
            cond_type = condition.get("type")
474
            if not all([title, cond_type]):
475
                return None
476
477
            condition_info = {
478
                "uid": service_uid,
479
                "type": cond_type,
480
                "title": title,
481
                "description": "",
482
                "choices": "",
483
                "default": "",
484
                "required": "",
485
                "report": "",
486
                "value": "",
487
            }
488
            condition = dict(condition)
489
            condition_info.update(condition)
490
            return condition_info
491
492
        # Sanitize the conditions
493
        conditions = filter(None, [to_condition(cond) for cond in conditions])
494
495
        # Conditions from "file" type are stored as attachments
496
        attachments = []
497
        for condition in conditions:
498
            if condition.get("type") != "file":
499
                continue
500
501
            value = condition.get("value")
502
            orig_attachment = condition.pop("attachment", None)
503
            if api.is_uid(value):
504
                # link to an existing attachment
505
                attachments.append(value)
506
            elif isinstance(value, Attachment):
507
                # link to an existing attachment
508
                uid = api.get_uid(value)
509
                attachments.append(uid)
510
                condition["value"] = uid
511
            elif getattr(value, "filename", ""):
512
                # create new attachment
513
                client = sample.getClient()
514
                att = api.create(client, "Attachment", AttachmentFile=value)
515
                attachments.append(api.get_uid(att))
516
                # update with the attachment uid
517
                condition["value"] = api.get_uid(att)
518
            elif api.is_uid(orig_attachment):
519
                # restore to original attachment
520
                attachments.append(orig_attachment)
521
                condition["value"] = orig_attachment
522
            else:
523
                # nothing we can handle, update with an empty value
524
                condition["value"] = ""
525
526
        # Link the analysis to the attachments
527
        if attachments:
528
            # Remove duplicates while keeping the order
529
            attachments.extend(self.getRawAttachment() or [])
530
            attachments = list(OrderedDict.fromkeys(attachments))
531
            self.setAttachment(attachments)
532
533
        # Store the conditions in sample
534
        sample.setServiceConditions(other_conditions + conditions)
535
536
    @security.public
537
    def getPrice(self):
538
        """The function obtains the analysis' price without VAT and without
539
        member discount
540
        :return: the price (without VAT or Member Discount) in decimal format
541
        """
542
        client = self.getClient()
543
        if client and client.getBulkDiscount():
544
            return self.getBulkPrice()
545
        return self.getField('Price').get(self)
546