Passed
Pull Request — 2.x (#1966)
by Ramon
04:36
created

SampleDataManager.is_field_writeable()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
3
from AccessControl import Unauthorized
4
from bika.lims import api
5
from bika.lims.interfaces import IAnalysisRequest
6
from bika.lims.interfaces import IBatchBookView
7
from Products.Archetypes.utils import mapply
8
from senaite.core import logger
9
from senaite.core.datamanagers import DataManager
10
from zope.component import adapter
11
12
13
@adapter(IAnalysisRequest)
14
class SampleDataManager(DataManager):
15
    """Data Manager for Samples
16
    """
17
18
    @property
19
    def fields(self):
20
        return api.get_fields(self.context)
21
22
    def is_field_readable(self, field):
23
        """Checks if the field is readable
24
        """
25
        return field.checkPermission("get", self.context)
26
27
    def is_field_writeable(self, field):
28
        """Checks if the field is writeable
29
        """
30
        return field.checkPermission("set", self.context)
31
32
    def get_field_by_name(self, name):
33
        """Get the field by name
34
        """
35
        field = self.fields.get(name)
36
37
        # try to fetch the field w/o the `get` prefix
38
        # this might be the case is some listings
39
        if field is None:
40
            # ensure we do not have the field setter as column
41
            name = name.split("get", 1)[-1]
42
            field = self.fields.get(name)
43
44
        return field
45
46
    def get(self, name):
47
        """Get sample field
48
        """
49
        # get the schema field
50
        field = self.get_field_by_name(name)
51
52
        # check if the field exists
53
        if field is None:
54
            raise AttributeError("Field '{}' not found".format(name))
55
56
        # Check the permission of the field
57
        if not self.is_field_readable(field):
58
            raise Unauthorized("Field '{}' not readable!".format(name))
59
60
        # return the value with the field accessor
61
        if hasattr(field, "getAccessor"):
62
            accessor = field.getAccessor(self.context)
63
            return accessor()
64
        else:
65
            # Set the value on the field directly
66
            return field.get(self.context)
67
68
    def set_analysis_result(self, name, value):
69
        """Handle Batchbook Results
70
71
        :param name: ID of the contained analysis
72
        :param value: Result of the Analysis
73
74
        :returns: True if the result was set, otherwise False
75
        """
76
        analysis = self.get_analysis_by_id(name)
77
        if not analysis:
78
            logger.error("Analysis '{}' not found".format(name))
79
            return False
80
81
        fields = api.get_fields(analysis)
82
        field = fields.get("Result")
83
        # Check the permission of the field
84
        if not self.is_field_writeable(field):
85
            logger.error("Field '{}' not writeable!".format(name))
86
            return False
87
88
        # set the analysis result with the mutator
89
        analysis.setResult(value)
90
        analysis.reindexObject()
91
        return True
92
93
    def get_analysis_by_id(self, name):
94
        """Get the analysis by ID
95
        """
96
        return self.context.get(name)
97
98
    def is_request_from_batchbook(self):
99
        """Checks if the request is coming from the batchbook
100
101
        If this is the case, the `name` does not refer to a field,
102
        but to an analysis and the `value` is always the result.
103
104
        :returns: True if the request was sent from the batchbook
105
        """
106
        req = api.get_request()
107
        view = req.PARENTS[0]
108
        return IBatchBookView.providedBy(view)
109
110
    def set(self, name, value):
111
        """Set sample field or analysis result
112
        """
113
        # set of updated objects
114
        updated_objects = set()
115
116
        # handle batchbook results set
117
        if self.is_request_from_batchbook():
118
            success = self.set_analysis_result(name, value)
119
            if not success:
120
                raise ValueError(
121
                    "Failed to set analysis result for '%s'" % name)
122
            return [self.context]
123
124
        # get the schema field
125
        field = self.get_field_by_name(name)
126
127
        if field is None:
128
            raise AttributeError("Field '{}' not found".format(name))
129
130
        # Check the permission of the field
131
        if not self.is_field_writeable(field):
132
            logger.error("Field '{}' not writeable!".format(name))
133
            return []
134
        # get the field mutator (works only for AT content types)
135
        if hasattr(field, "getMutator"):
136
            mutator = field.getMutator(self.context)
137
            mapply(mutator, value)
138
        else:
139
            # Set the value on the field directly
140
            field.set(self.context, value)
141
142
        updated_objects.add(self.context)
143
144
        # return a unified list of the updated objects
145
        return list(updated_objects)
146