Completed
Pull Request — 2.x (#1777)
by Jordi
11:39 queued 05:05
created

bika.lims.browser.fields.aranalysesfield   C

Complexity

Total Complexity 54

Size/Duplication

Total Lines 418
Duplicated Lines 4.31 %

Importance

Changes 0
Metric Value
wmc 54
eloc 201
dl 18
loc 418
rs 6.4799
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
A ARAnalysesField.remove_analysis() 0 31 5
A ARAnalysesField.get_from_ancestor() 0 9 2
A ARAnalysesField._to_service() 0 31 5
C ARAnalysesField.set() 0 60 9
A ARAnalysesField.get() 0 25 2
A ARAnalysesField.get_from_descendant() 0 15 3
A ARAnalysesField.resolve_uid() 18 18 5
A ARAnalysesField.resolve_specs() 0 21 5
A ARAnalysesField.resolve_range() 0 18 4
A ARAnalysesField.get_analyses_from_descendants() 0 7 2
B ARAnalysesField.add_analysis() 0 48 5
A ARAnalysesField.resolve_analyses() 0 29 3
A ARAnalysesField.generate_analysis_id() 0 10 2
A ARAnalysesField.get_from_instance() 0 8 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like bika.lims.browser.fields.aranalysesfield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
23
from AccessControl import ClassSecurityInfo
24
from AccessControl import Unauthorized
25
from bika.lims import api
26
from bika.lims import logger
27
from bika.lims.api.security import check_permission
28
from bika.lims.catalog import CATALOG_ANALYSIS_LISTING
29
from bika.lims.catalog import SETUP_CATALOG
30
from bika.lims.interfaces import IAnalysis
31
from bika.lims.interfaces import IAnalysisService
32
from bika.lims.interfaces import IARAnalysesField
33
from bika.lims.interfaces import ISubmitted
34
from bika.lims.permissions import AddAnalysis
35
from bika.lims.utils.analysis import create_analysis
36
from Products.Archetypes.public import Field
37
from Products.Archetypes.public import ObjectField
38
from Products.Archetypes.Registry import registerField
39
from zope.interface import implements
40
41
DETACHED_STATES = ["cancelled", "retracted", "rejected"]
42
43
44
"""Field to manage Analyses on ARs
45
46
Please see the assigned doctest at tests/doctests/ARAnalysesField.rst
47
48
Run this test from the buildout directory:
49
50
    bin/test test_textual_doctests -t ARAnalysesField
51
"""
52
53
54
class ARAnalysesField(ObjectField):
55
    """A field that stores Analyses instances
56
    """
57
    implements(IARAnalysesField)
58
59
    security = ClassSecurityInfo()
60
    _properties = Field._properties.copy()
61
    _properties.update({
62
        "type": "analyses",
63
        "default": None,
64
    })
65
66
    security.declarePrivate('get')
67
68
    def get(self, instance, **kwargs):
69
        """Returns a list of Analyses assigned to this AR
70
71
        Return a list of catalog brains unless `full_objects=True` is passed.
72
        Other keyword arguments are passed to bika_analysis_catalog
73
74
        :param instance: Analysis Request object
75
        :param kwargs: Keyword arguments to inject in the search query
76
        :returns: A list of Analysis Objects/Catalog Brains
77
        """
78
        # Do we need to return objects or brains
79
        full_objects = kwargs.get("full_objects", False)
80
81
        # Bail out parameters from kwargs that don't match with indexes
82
        catalog = api.get_tool(CATALOG_ANALYSIS_LISTING)
83
        indexes = catalog.indexes()
84
        query = dict([(k, v) for k, v in kwargs.items() if k in indexes])
85
86
        # Do the search against the catalog
87
        query["portal_type"] = "Analysis"
88
        query["getAncestorsUIDs"] = api.get_uid(instance)
89
        brains = catalog(query)
90
        if full_objects:
91
            return map(api.get_object, brains)
92
        return brains
93
94
    security.declarePrivate('set')
95
96
    def set(self, instance, items, prices=None, specs=None, hidden=None, **kw):
97
        """Set/Assign Analyses to this AR
98
99
        :param items: List of Analysis objects/brains, AnalysisService
100
                      objects/brains and/or Analysis Service uids
101
        :type items: list
102
        :param prices: Mapping of AnalysisService UID -> price
103
        :type prices: dict
104
        :param specs: List of AnalysisService UID -> Result Range mappings
105
        :type specs: list
106
        :param hidden: List of AnalysisService UID -> Hidden mappings
107
        :type hidden: list
108
        :returns: list of new assigned Analyses
109
        """
110
        if items is None:
111
            items = []
112
113
        # Bail out if the items is not a list type
114
        if not isinstance(items, (list, tuple)):
115
            raise TypeError(
116
                "Items parameter must be a tuple or list, got '{}'".format(
117
                    type(items)))
118
119
        # Bail out if the AR is inactive
120
        if not api.is_active(instance):
121
            raise Unauthorized("Inactive ARs can not be modified")
122
123
        # Bail out if the user has not the right permission
124
        if not check_permission(AddAnalysis, instance):
125
            raise Unauthorized("You do not have the '{}' permission"
126
                               .format(AddAnalysis))
127
128
        # Convert the items to a valid list of AnalysisServices
129
        services = filter(None, map(self._to_service, items))
130
131
        # Calculate dependencies
132
        dependencies = map(lambda s: s.getServiceDependencies(), services)
133
        dependencies = list(itertools.chain.from_iterable(dependencies))
134
135
        # Merge dependencies and services
136
        services = set(services + dependencies)
137
138
        # Modify existing AR specs with new form values of selected analyses
139
        specs = self.resolve_specs(instance, specs)
140
141
        # Add analyses
142
        params = dict(prices=prices, hidden=hidden, specs=specs)
143
        map(lambda serv: self.add_analysis(instance, serv, **params), services)
144
145
        # Get all analyses (those from descendants included)
146
        analyses = instance.objectValues("Analysis")
147
        analyses.extend(self.get_analyses_from_descendants(instance))
148
149
        # Bail out those not in services list or submitted
150
        uids = map(api.get_uid, services)
151
        to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses)
152
        to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove)
153
154
        # Remove analyses
155
        map(self.remove_analysis, to_remove)
156
157
    def resolve_specs(self, instance, results_ranges):
158
        """Returns a dictionary where the key is the service_uid and the value
159
        is its results range. The dictionary is made by extending the
160
        results_ranges passed-in with the Sample's ResultsRanges (a copy of the
161
        specifications initially set)
162
        """
163
        rrs = results_ranges or []
164
165
        # Sample's Results ranges
166
        sample_rrs = instance.getResultsRange()
167
168
        # Ensure all subfields from specification are kept and missing values
169
        # for subfields are filled in accordance with the specs
170
        rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs)
171
172
        # Append those from sample that are missing in the ranges passed-in
173
        service_uids = map(lambda rr: rr["uid"], rrs)
174
        rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs))
175
176
        # Create a dict for easy access to results ranges
177
        return dict(map(lambda rr: (rr["uid"], rr), rrs))
178
179
    def resolve_range(self, result_range, sample_result_ranges):
180
        """Resolves the range by adding the uid if not present and filling the
181
        missing subfield values with those that come from the Sample
182
        specification if they are not present in the result_range passed-in
183
        """
184
        # Resolve result_range to make sure it contain uid subfield
185
        rrs = self.resolve_uid(result_range)
186
        uid = rrs.get("uid")
187
188
        for sample_rr in sample_result_ranges:
189
            if uid and sample_rr.get("uid") == uid:
190
                # Keep same fields from sample
191
                rr = sample_rr.copy()
192
                rr.update(rrs)
193
                return rr
194
195
        # Return the original with no changes
196
        return rrs
197
198 View Code Duplication
    def resolve_uid(self, result_range):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
199
        """Resolves the uid key for the result_range passed in if it does not
200
        exist when contains a keyword
201
        """
202
        value = result_range.copy()
203
        uid = value.get("uid")
204
        if api.is_uid(uid) and uid != "0":
205
            return value
206
207
        # uid key does not exist or is not valid, try to infere from keyword
208
        keyword = value.get("keyword")
209
        if keyword:
210
            query = dict(portal_type="AnalysisService", getKeyword=keyword)
211
            brains = api.search(query, SETUP_CATALOG)
212
            if len(brains) == 1:
213
                uid = api.get_uid(brains[0])
214
        value["uid"] = uid
215
        return value
216
217
    def add_analysis(self, instance, service, **kwargs):
218
        service_uid = api.get_uid(service)
219
220
        # Ensure we have suitable parameters
221
        specs = kwargs.get("specs") or {}
222
223
        # Get the hidden status for the service
224
        hidden = kwargs.get("hidden") or []
225
        hidden = filter(lambda d: d.get("uid") == service_uid, hidden)
226
        hidden = hidden and hidden[0].get("hidden") or service.getHidden()
227
228
        # Get the price for the service
229
        prices = kwargs.get("prices") or {}
230
        price = prices.get(service_uid) or service.getPrice()
231
232
        # Gets the analysis or creates the analysis for this service
233
        # Note this returns a list, because is possible to have multiple
234
        # partitions with same analysis
235
        analyses = self.resolve_analyses(instance, service)
236
237
        # Filter out analyses in detached states
238
        # This allows to re-add an analysis that was retracted or cancelled
239
        analyses = filter(
240
            lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES,
241
            analyses)
242
243
        if not analyses:
244
            # Create the analysis
245
            new_id = self.generate_analysis_id(instance, service)
246
            logger.info("Creating new analysis '{}'".format(new_id))
247
            analysis = create_analysis(instance, service, id=new_id)
248
            analyses.append(analysis)
249
250
        for analysis in analyses:
251
            # Set the hidden status
252
            analysis.setHidden(hidden)
253
254
            # Set the price of the Analysis
255
            analysis.setPrice(price)
256
257
            # Set the internal use status
258
            parent_sample = analysis.getRequest()
259
            analysis.setInternalUse(parent_sample.getInternalUse())
260
261
            # Set the result range to the analysis
262
            analysis_rr = specs.get(service_uid) or analysis.getResultsRange()
263
            analysis.setResultsRange(analysis_rr)
264
            analysis.reindexObject()
265
266
    def generate_analysis_id(self, instance, service):
267
        """Generate a new analysis ID
268
        """
269
        count = 1
270
        keyword = service.getKeyword()
271
        new_id = keyword
272
        while new_id in instance.objectIds():
273
            new_id = "{}-{}".format(keyword, count)
274
            count += 1
275
        return new_id
276
277
    def remove_analysis(self, analysis):
278
        """Removes a given analysis from the instance
279
        """
280
        # Remember assigned attachments
281
        # https://github.com/senaite/senaite.core/issues/1025
282
        attachments = analysis.getAttachment()
283
        analysis.setAttachment([])
284
285
        # If assigned to a worksheet, unassign it before deletion
286
        worksheet = analysis.getWorksheet()
287
        if worksheet:
288
            worksheet.removeAnalysis(analysis)
289
290
        # handle retest source deleted
291
        retest = analysis.getRetest()
292
        if retest:
293
            # unset reference link
294
            retest.setRetestOf(None)
295
296
        # Remove the analysis
297
        # Note the analysis might belong to a partition
298
        analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)])
299
300
        # Remove orphaned attachments
301
        for attachment in attachments:
302
            if not attachment.getLinkedAnalyses():
303
                # only delete attachments which are no further linked
304
                logger.info(
305
                    "Deleting attachment: {}".format(attachment.getId()))
306
                attachment_id = api.get_id(attachment)
307
                api.get_parent(attachment).manage_delObjects(attachment_id)
308
309
    def resolve_analyses(self, instance, service):
310
        """Resolves analyses for the service and instance
311
        It returns a list, cause for a given sample, multiple analyses for same
312
        service can exist due to the possibility of having multiple partitions
313
        """
314
        analyses = []
315
316
        # Does the analysis exists in this instance already?
317
        instance_analyses = self.get_from_instance(instance, service)
318
319
        if instance_analyses:
320
            analyses.extend(instance_analyses)
321
322
        # Does the analysis exists in an ancestor?
323
        from_ancestor = self.get_from_ancestor(instance, service)
324
        for ancestor_analysis in from_ancestor:
325
            # Move the analysis into this instance. The ancestor's
326
            # analysis will be masked otherwise
327
            analysis_id = api.get_id(ancestor_analysis)
328
            logger.info("Analysis {} is from an ancestor".format(analysis_id))
329
            cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id)
330
            instance.manage_pasteObjects(cp)
331
            analyses.append(instance._getOb(analysis_id))
332
333
        # Does the analysis exists in descendants?
334
        from_descendant = self.get_from_descendant(instance, service)
335
        analyses.extend(from_descendant)
336
337
        return analyses
338
339
    def get_analyses_from_descendants(self, instance):
340
        """Returns all the analyses from descendants
341
        """
342
        analyses = []
343
        for descendant in instance.getDescendants(all_descendants=True):
344
            analyses.extend(descendant.objectValues("Analysis"))
345
        return analyses
346
347
    def get_from_instance(self, instance, service):
348
        """Returns analyses for the given service from the instance
349
        """
350
        service_uid = api.get_uid(service)
351
        analyses = instance.objectValues("Analysis")
352
        # Filter those analyses with same keyword. Note that a Sample can
353
        # contain more than one analysis with same keyword because of retests
354
        return filter(lambda an: an.getServiceUID() == service_uid, analyses)
355
356
    def get_from_ancestor(self, instance, service):
357
        """Returns analyses for the given service from ancestors
358
        """
359
        ancestor = instance.getParentAnalysisRequest()
360
        if not ancestor:
361
            return []
362
363
        analyses = self.get_from_instance(ancestor, service)
364
        return analyses or self.get_from_ancestor(ancestor, service)
365
366
    def get_from_descendant(self, instance, service):
367
        """Returns analyses for the given service from descendants
368
        """
369
        analyses = []
370
        for descendant in instance.getDescendants():
371
            # Does the analysis exists in the current descendant?
372
            descendant_analyses = self.get_from_instance(descendant, service)
373
            if descendant_analyses:
374
                analyses.extend(descendant_analyses)
375
376
            # Search in descendants from current descendant
377
            from_descendant = self.get_from_descendant(descendant, service)
378
            analyses.extend(from_descendant)
379
380
        return analyses
381
382
    def _to_service(self, thing):
383
        """Convert to Analysis Service
384
385
        :param thing: UID/Catalog Brain/Object/Something
386
        :returns: Analysis Service object or None
387
        """
388
389
        # Convert UIDs to objects
390
        if api.is_uid(thing):
391
            thing = api.get_object_by_uid(thing, None)
392
393
        # Bail out if the thing is not a valid object
394
        if not api.is_object(thing):
395
            logger.warn("'{}' is not a valid object!".format(repr(thing)))
396
            return None
397
398
        # Ensure we have an object here and not a brain
399
        obj = api.get_object(thing)
400
401
        if IAnalysisService.providedBy(obj):
402
            return obj
403
404
        if IAnalysis.providedBy(obj):
405
            return obj.getAnalysisService()
406
407
        # An object, but neither an Analysis nor AnalysisService?
408
        # This should never happen.
409
        portal_type = api.get_portal_type(obj)
410
        logger.error("ARAnalysesField doesn't accept objects from {} type. "
411
                     "The object will be dismissed.".format(portal_type))
412
        return None
413
414
415
registerField(ARAnalysesField,
416
              title="Analyses",
417
              description="Manages Analyses of ARs")
418