Passed
Pull Request — 2.x (#1778)
by Ramon
05:34
created

RoutineAnalysisDataManager.get()   A

Complexity

Conditions 4

Size

Total Lines 21
Code Lines 10

Duplication

Lines 21
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 10
dl 21
loc 21
rs 9.9
c 0
b 0
f 0
cc 4
nop 2
1
# -*- coding: utf-8 -*-
2
3
from AccessControl import Unauthorized
4
from bika.lims import api
5
from bika.lims.interfaces import IReferenceAnalysis
6
from bika.lims.interfaces import IRoutineAnalysis
7
from Products.Archetypes.utils import mapply
8
from senaite.core import logger
9
from senaite.core.datamanagers import DataManager
10
from zope.component import adapter
11
12
13
@adapter(IRoutineAnalysis)
14
class RoutineAnalysisDataManager(DataManager):
15
    """Data Manager for Routine Analyses
16
    """
17
18
    @property
19
    def fields(self):
20
        return api.get_fields(self.context)
21
22
    def is_field_readable(self, field):
23
        """Checks if the field is readable
24
        """
25
        return field.checkPermission("get", self.context)
26
27
    def is_field_writeable(self, field):
28
        """Checks if the field is writeable
29
        """
30
        return field.checkPermission("set", self.context)
31
32 View Code Duplication
    def get(self, name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
33
        """Get analysis field
34
        """
35
        # schema field
36
        field = self.fields.get(name)
37
38
        # check if the field exists
39
        if field is None:
40
            raise AttributeError("Field '{}' not found".format(name))
41
42
        # Check the permission of the field
43
        if not self.is_field_readable(field):
44
            raise Unauthorized("Field '{}' not readable!".format(name))
45
46
        # return the value with the field accessor
47
        if hasattr(field, "getAccessor"):
48
            accessor = field.getAccessor(self.context)
49
            return accessor()
50
        else:
51
            # Set the value on the field directly
52
            return field.get(self.context)
53
54
    def set(self, name, value):
55
        """Set analysis field/interim value
56
        """
57
        # set of updated objects
58
        updated_objects = set()
59
60
        # schema field
61
        field = self.fields.get(name)
62
63
        # interims
64
        interims = self.context.getInterimFields()
65
        interim_keys = map(lambda i: i.get("keyword"), interims)
66
67
        # schema field found
68
        if field:
69
            # Check the permission of the field
70
            if not self.is_field_writeable(field):
71
                logger.error("Field '{}' not writeable!".format(name))
72
                return []
73
            # get the field mutator (works only for AT content types)
74
            if hasattr(field, "getMutator"):
75
                mutator = field.getMutator(self.context)
76
                mapply(mutator, value)
77
            else:
78
                # Set the value on the field directly
79
                field.set(self.context, value)
80
            updated_objects.add(self.context)
81
82
        # interim key found
83
        elif name in interim_keys:
84
            # Check the permission of the field
85
            interim_field = self.fields.get("InterimFields")
86
            if not self.is_field_writeable(interim_field):
87
                logger.error("Interim field '{}' not writeable!".format(name))
88
                return []
89
            for interim in interims:
90
                if interim.get("keyword") == name:
91
                    interim["value"] = value
92
            self.context.setInterimFields(interims)
93
94
        # recalculate dependent results for result and interim fields
95
        if name == "Result" or name in interim_keys:
96
            updated_objects.add(self.context)
97
            updated_objects.update(self.recalculate_results(self.context))
98
99
        # return a unified list of the updated objects
100
        return list(updated_objects)
101
102
    def recalculate_results(self, obj, recalculated=None):
103
        """Recalculate the result of the object and its dependents
104
105
        :returns: List of recalculated objects
106
        """
107
        if recalculated is None:
108
            recalculated = set()
109
110
        # avoid double recalculation in recursion
111
        if obj in recalculated:
112
            return set()
113
114
        # recalculate own result
115
        if obj.calculateResult(override=True):
116
            # append object to the list of recalculated results
117
            recalculated.add(obj)
118
        # recalculate dependent analyses
119
        for dep in obj.getDependents():
120
            if dep.calculateResult(override=True):
121
                # TODO the `calculateResult` method should return False here!
122
                if dep.getResult() in ["NA", "0/0"]:
123
                    continue
124
                recalculated.add(dep)
125
            # recalculate dependents of dependents
126
            for ddep in dep.getDependents():
127
                recalculated.update(
128
                    self.recalculate_results(
129
                        ddep, recalculated=recalculated))
130
        return recalculated
131
132
133
@adapter(IReferenceAnalysis)
134
class ReferenceAnalysisDataManager(RoutineAnalysisDataManager):
135
    """Data Manager for Reference Analyses
136
    """
137