Passed
Push — 2.x ( 380493...9aebce )
by Jordi
06:51
created

senaite.core.datamanagers.content.sample   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 166
Duplicated Lines 20.48 %

Importance

Changes 0
Metric Value
wmc 21
eloc 75
dl 34
loc 166
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A SampleDataManager.fields() 0 3 1
A SampleDataManager.set_analysis_result() 0 24 3
B SampleDataManager.set() 0 36 6
A SampleDataManager.is_field_writeable() 0 6 2
A SampleDataManager.is_request_from_batchbook() 0 11 1
A SampleDataManager.is_field_readable() 0 4 1
A SampleDataManager.get_analysis_by_id() 0 4 1
A SampleDataManager.get_field_by_name() 13 13 2
A SampleDataManager.get() 21 21 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from AccessControl import Unauthorized
22
from bika.lims import api
23
from bika.lims.interfaces import IAnalysisRequest
24
from bika.lims.interfaces import IBatchBookView
25
from Products.Archetypes.utils import mapply
26
from senaite.core import logger
27
from senaite.core.datamanagers.base import DataManager
28
from zope.component import adapter
29
30
31
@adapter(IAnalysisRequest)
32
class SampleDataManager(DataManager):
33
    """Data Manager for Samples
34
    """
35
36
    @property
37
    def fields(self):
38
        return api.get_fields(self.context)
39
40
    def is_field_readable(self, field):
41
        """Checks if the field is readable
42
        """
43
        return field.checkPermission("get", self.context)
44
45
    def is_field_writeable(self, field, context=None):
46
        """Checks if the field is writeable
47
        """
48
        if context is None:
49
            context = self.context
50
        return field.checkPermission("set", context)
51
52 View Code Duplication
    def get_field_by_name(self, name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
53
        """Get the field by name
54
        """
55
        field = self.fields.get(name)
56
57
        # try to fetch the field w/o the `get` prefix
58
        # this might be the case is some listings
59
        if field is None:
60
            # ensure we do not have the field setter as column
61
            name = name.split("get", 1)[-1]
62
            field = self.fields.get(name)
63
64
        return field
65
66 View Code Duplication
    def get(self, name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
67
        """Get sample field
68
        """
69
        # get the schema field
70
        field = self.get_field_by_name(name)
71
72
        # check if the field exists
73
        if field is None:
74
            raise AttributeError("Field '{}' not found".format(name))
75
76
        # Check the permission of the field
77
        if not self.is_field_readable(field):
78
            raise Unauthorized("Field '{}' not readable!".format(name))
79
80
        # return the value with the field accessor
81
        if hasattr(field, "getAccessor"):
82
            accessor = field.getAccessor(self.context)
83
            return accessor()
84
        else:
85
            # Set the value on the field directly
86
            return field.get(self.context)
87
88
    def set_analysis_result(self, name, value):
89
        """Handle Batchbook Results
90
91
        :param name: ID of the contained analysis
92
        :param value: Result of the Analysis
93
94
        :returns: True if the result was set, otherwise False
95
        """
96
        analysis = self.get_analysis_by_id(name)
97
        if not analysis:
98
            logger.error("Analysis '{}' not found".format(name))
99
            return False
100
101
        fields = api.get_fields(analysis)
102
        field = fields.get("Result")
103
        # Check the permission of the field
104
        if not self.is_field_writeable(field, analysis):
105
            logger.error("Field '{}' not writeable!".format(name))
106
            return False
107
108
        # set the analysis result with the mutator
109
        analysis.setResult(value)
110
        analysis.reindexObject()
111
        return True
112
113
    def get_analysis_by_id(self, name):
114
        """Get the analysis by ID
115
        """
116
        return self.context.get(name)
117
118
    def is_request_from_batchbook(self):
119
        """Checks if the request is coming from the batchbook
120
121
        If this is the case, the `name` does not refer to a field,
122
        but to an analysis and the `value` is always the result.
123
124
        :returns: True if the request was sent from the batchbook
125
        """
126
        req = api.get_request()
127
        view = req.PARENTS[0]
128
        return IBatchBookView.providedBy(view)
129
130
    def set(self, name, value):
131
        """Set sample field or analysis result
132
        """
133
        # set of updated objects
134
        updated_objects = set()
135
136
        # handle batchbook results set
137
        if self.is_request_from_batchbook():
138
            success = self.set_analysis_result(name, value)
139
            if not success:
140
                raise ValueError(
141
                    "Failed to set analysis result for '%s'" % name)
142
            return [self.context]
143
144
        # get the schema field
145
        field = self.get_field_by_name(name)
146
147
        if field is None:
148
            raise AttributeError("Field '{}' not found".format(name))
149
150
        # Check the permission of the field
151
        if not self.is_field_writeable(field):
152
            logger.error("Field '{}' not writeable!".format(name))
153
            return []
154
        # get the field mutator (works only for AT content types)
155
        if hasattr(field, "getMutator"):
156
            mutator = field.getMutator(self.context)
157
            mapply(mutator, value)
158
        else:
159
            # Set the value on the field directly
160
            field.set(self.context, value)
161
162
        updated_objects.add(self.context)
163
164
        # return a unified list of the updated objects
165
        return list(updated_objects)
166