Passed
Push — 2.x ( 380493...9aebce )
by Jordi
06:51
created

senaite.core.datamanagers.content.analysis   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 151
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 24
eloc 71
dl 0
loc 151
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
B RoutineAnalysisDataManager.recalculate_results() 0 29 8
C RoutineAnalysisDataManager.set() 0 44 9
A RoutineAnalysisDataManager.get() 0 21 4
A RoutineAnalysisDataManager.is_field_readable() 0 4 1
A RoutineAnalysisDataManager.is_field_writeable() 0 4 1
A RoutineAnalysisDataManager.fields() 0 3 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from AccessControl import Unauthorized
22
from bika.lims import api
23
from bika.lims.interfaces import IReferenceAnalysis
24
from bika.lims.interfaces import IRoutineAnalysis
25
from Products.Archetypes.utils import mapply
26
from senaite.core import logger
27
from senaite.core.datamanagers.base import DataManager
28
from zope.component import adapter
29
30
31
@adapter(IRoutineAnalysis)
32
class RoutineAnalysisDataManager(DataManager):
33
    """Data Manager for Routine Analyses
34
    """
35
36
    @property
37
    def fields(self):
38
        return api.get_fields(self.context)
39
40
    def is_field_readable(self, field):
41
        """Checks if the field is readable
42
        """
43
        return field.checkPermission("get", self.context)
44
45
    def is_field_writeable(self, field):
46
        """Checks if the field is writeable
47
        """
48
        return field.checkPermission("set", self.context)
49
50
    def get(self, name):
51
        """Get analysis field
52
        """
53
        # schema field
54
        field = self.fields.get(name)
55
56
        # check if the field exists
57
        if field is None:
58
            raise AttributeError("Field '{}' not found".format(name))
59
60
        # Check the permission of the field
61
        if not self.is_field_readable(field):
62
            raise Unauthorized("Field '{}' not readable!".format(name))
63
64
        # return the value with the field accessor
65
        if hasattr(field, "getAccessor"):
66
            accessor = field.getAccessor(self.context)
67
            return accessor()
68
        else:
69
            # Set the value on the field directly
70
            return field.get(self.context)
71
72
    def set(self, name, value):
73
        """Set analysis field/interim value
74
        """
75
        # set of updated objects
76
        updated_objects = set()
77
78
        # schema field
79
        field = self.fields.get(name)
80
81
        # interims
82
        interims = self.context.getInterimFields()
83
        interim_keys = map(lambda i: i.get("keyword"), interims)
84
85
        # schema field found
86
        if field:
87
            # Check the permission of the field
88
            if not self.is_field_writeable(field):
89
                logger.error("Field '{}' not writeable!".format(name))
90
                return []
91
            # get the field mutator (works only for AT content types)
92
            if hasattr(field, "getMutator"):
93
                mutator = field.getMutator(self.context)
94
                mapply(mutator, value)
95
            else:
96
                # Set the value on the field directly
97
                field.set(self.context, value)
98
            updated_objects.add(self.context)
99
100
        # interim key found
101
        elif name in interim_keys:
102
            # Check the permission of the field
103
            interim_field = self.fields.get("InterimFields")
104
            if not self.is_field_writeable(interim_field):
105
                logger.error("Interim field '{}' not writeable!".format(name))
106
                return []
107
            self.context.setInterimValue(name, value)
108
109
        # recalculate dependent results for result and interim fields
110
        if name == "Result" or name in interim_keys:
111
            updated_objects.add(self.context)
112
            updated_objects.update(self.recalculate_results(self.context))
113
114
        # return a unified list of the updated objects
115
        return list(updated_objects)
116
117
    def recalculate_results(self, obj, recalculated=None):
118
        """Recalculate the result of the object and its dependents
119
120
        :returns: List of recalculated objects
121
        """
122
        if recalculated is None:
123
            recalculated = set()
124
125
        # avoid double recalculation in recursion
126
        if obj in recalculated:
127
            return set()
128
129
        # recalculate own result
130
        if obj.calculateResult(override=True):
131
            # append object to the list of recalculated results
132
            recalculated.add(obj)
133
        # recalculate dependent analyses
134
        for dep in obj.getDependents():
135
            if dep.calculateResult(override=True):
136
                # TODO the `calculateResult` method should return False here!
137
                if dep.getResult() in ["NA", "0/0"]:
138
                    continue
139
                recalculated.add(dep)
140
            # recalculate dependents of dependents
141
            for ddep in dep.getDependents():
142
                recalculated.update(
143
                    self.recalculate_results(
144
                        ddep, recalculated=recalculated))
145
        return recalculated
146
147
148
@adapter(IReferenceAnalysis)
149
class ReferenceAnalysisDataManager(RoutineAnalysisDataManager):
150
    """Data Manager for Reference Analyses
151
    """
152