Passed
Push — 2.x ( d668b2...78a11b )
by Ramon
05:54
created

bika.lims.browser.fields.aranalysesfield   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 450
Duplicated Lines 4 %

Importance

Changes 0
Metric Value
wmc 58
eloc 216
dl 18
loc 450
rs 4.5599
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
C ARAnalysesField.set() 0 60 9
A ARAnalysesField.get() 0 25 2
A ARAnalysesField.resolve_uid() 18 18 5
A ARAnalysesField.resolve_specs() 0 21 5
A ARAnalysesField.resolve_range() 0 18 4
A ARAnalysesField.resolve_conditions() 0 28 3
A ARAnalysesField.remove_analysis() 0 31 5
A ARAnalysesField.get_from_ancestor() 0 9 2
A ARAnalysesField._to_service() 0 31 5
A ARAnalysesField.get_from_descendant() 0 15 3
A ARAnalysesField.get_analyses_from_descendants() 0 7 2
B ARAnalysesField.add_analysis() 0 59 7
A ARAnalysesField.resolve_analyses() 0 32 4
A ARAnalysesField.get_from_instance() 0 8 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like bika.lims.browser.fields.aranalysesfield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
23
from AccessControl import ClassSecurityInfo
24
from AccessControl import Unauthorized
25
from bika.lims import api
26
from bika.lims import logger
27
from bika.lims.api.security import check_permission
28
from bika.lims.interfaces import IAnalysis
29
from bika.lims.interfaces import IAnalysisService
30
from bika.lims.interfaces import IARAnalysesField
31
from bika.lims.interfaces import ISubmitted
32
from bika.lims.permissions import AddAnalysis
33
from bika.lims.utils.analysis import create_analysis
34
from Products.Archetypes.public import Field
35
from Products.Archetypes.public import ObjectField
36
from Products.Archetypes.Registry import registerField
37
from senaite.core.catalog import ANALYSIS_CATALOG
38
from senaite.core.catalog import SETUP_CATALOG
39
from zope.interface import implements
40
41
DETACHED_STATES = ["cancelled", "retracted", "rejected"]
42
43
44
"""Field to manage Analyses on ARs
45
46
Please see the assigned doctest at tests/doctests/ARAnalysesField.rst
47
48
Run this test from the buildout directory:
49
50
    bin/test test_textual_doctests -t ARAnalysesField
51
"""
52
53
54
class ARAnalysesField(ObjectField):
55
    """A field that stores Analyses instances
56
    """
57
    implements(IARAnalysesField)
58
59
    security = ClassSecurityInfo()
60
    _properties = Field._properties.copy()
61
    _properties.update({
62
        "type": "analyses",
63
        "default": None,
64
    })
65
66
    security.declarePrivate('get')
67
68
    def get(self, instance, **kwargs):
69
        """Returns a list of Analyses assigned to this AR
70
71
        Return a list of catalog brains unless `full_objects=True` is passed.
72
        Other keyword arguments are passed to senaite_catalog_analysis
73
74
        :param instance: Analysis Request object
75
        :param kwargs: Keyword arguments to inject in the search query
76
        :returns: A list of Analysis Objects/Catalog Brains
77
        """
78
        # Do we need to return objects or brains
79
        full_objects = kwargs.get("full_objects", False)
80
81
        # Bail out parameters from kwargs that don't match with indexes
82
        catalog = api.get_tool(ANALYSIS_CATALOG)
83
        indexes = catalog.indexes()
84
        query = dict([(k, v) for k, v in kwargs.items() if k in indexes])
85
86
        # Do the search against the catalog
87
        query["portal_type"] = "Analysis"
88
        query["getAncestorsUIDs"] = api.get_uid(instance)
89
        brains = catalog(query)
90
        if full_objects:
91
            return map(api.get_object, brains)
92
        return brains
93
94
    security.declarePrivate('set')
95
96
    def set(self, instance, items, prices=None, specs=None, hidden=None, **kw):
97
        """Set/Assign Analyses to this AR
98
99
        :param items: List of Analysis objects/brains, AnalysisService
100
                      objects/brains and/or Analysis Service uids
101
        :type items: list
102
        :param prices: Mapping of AnalysisService UID -> price
103
        :type prices: dict
104
        :param specs: List of AnalysisService UID -> Result Range mappings
105
        :type specs: list
106
        :param hidden: List of AnalysisService UID -> Hidden mappings
107
        :type hidden: list
108
        :returns: list of new assigned Analyses
109
        """
110
        if items is None:
111
            items = []
112
113
        # Bail out if the items is not a list type
114
        if not isinstance(items, (list, tuple)):
115
            raise TypeError(
116
                "Items parameter must be a tuple or list, got '{}'".format(
117
                    type(items)))
118
119
        # Bail out if the AR is inactive
120
        if not api.is_active(instance):
121
            raise Unauthorized("Inactive ARs can not be modified")
122
123
        # Bail out if the user has not the right permission
124
        if not check_permission(AddAnalysis, instance):
125
            raise Unauthorized("You do not have the '{}' permission"
126
                               .format(AddAnalysis))
127
128
        # Convert the items to a valid list of AnalysisServices
129
        services = filter(None, map(self._to_service, items))
130
131
        # Calculate dependencies
132
        dependencies = map(lambda s: s.getServiceDependencies(), services)
133
        dependencies = list(itertools.chain.from_iterable(dependencies))
134
135
        # Merge dependencies and services
136
        services = set(services + dependencies)
137
138
        # Modify existing AR specs with new form values of selected analyses
139
        specs = self.resolve_specs(instance, specs)
140
141
        # Add analyses
142
        params = dict(prices=prices, hidden=hidden, specs=specs)
143
        map(lambda serv: self.add_analysis(instance, serv, **params), services)
144
145
        # Get all analyses (those from descendants included)
146
        analyses = instance.objectValues("Analysis")
147
        analyses.extend(self.get_analyses_from_descendants(instance))
148
149
        # Bail out those not in services list or submitted
150
        uids = map(api.get_uid, services)
151
        to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses)
152
        to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove)
153
154
        # Remove analyses
155
        map(self.remove_analysis, to_remove)
156
157
    def resolve_specs(self, instance, results_ranges):
158
        """Returns a dictionary where the key is the service_uid and the value
159
        is its results range. The dictionary is made by extending the
160
        results_ranges passed-in with the Sample's ResultsRanges (a copy of the
161
        specifications initially set)
162
        """
163
        rrs = results_ranges or []
164
165
        # Sample's Results ranges
166
        sample_rrs = instance.getResultsRange()
167
168
        # Ensure all subfields from specification are kept and missing values
169
        # for subfields are filled in accordance with the specs
170
        rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs)
171
172
        # Append those from sample that are missing in the ranges passed-in
173
        service_uids = map(lambda rr: rr["uid"], rrs)
174
        rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs))
175
176
        # Create a dict for easy access to results ranges
177
        return dict(map(lambda rr: (rr["uid"], rr), rrs))
178
179
    def resolve_range(self, result_range, sample_result_ranges):
180
        """Resolves the range by adding the uid if not present and filling the
181
        missing subfield values with those that come from the Sample
182
        specification if they are not present in the result_range passed-in
183
        """
184
        # Resolve result_range to make sure it contain uid subfield
185
        rrs = self.resolve_uid(result_range)
186
        uid = rrs.get("uid")
187
188
        for sample_rr in sample_result_ranges:
189
            if uid and sample_rr.get("uid") == uid:
190
                # Keep same fields from sample
191
                rr = sample_rr.copy()
192
                rr.update(rrs)
193
                return rr
194
195
        # Return the original with no changes
196
        return rrs
197
198 View Code Duplication
    def resolve_uid(self, result_range):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
199
        """Resolves the uid key for the result_range passed in if it does not
200
        exist when contains a keyword
201
        """
202
        value = result_range.copy()
203
        uid = value.get("uid")
204
        if api.is_uid(uid) and uid != "0":
205
            return value
206
207
        # uid key does not exist or is not valid, try to infere from keyword
208
        keyword = value.get("keyword")
209
        if keyword:
210
            query = dict(portal_type="AnalysisService", getKeyword=keyword)
211
            brains = api.search(query, SETUP_CATALOG)
212
            if len(brains) == 1:
213
                uid = api.get_uid(brains[0])
214
        value["uid"] = uid
215
        return value
216
217
    def resolve_conditions(self, analysis):
218
        """Returns the conditions to be applied to this analysis by merging
219
        those already set at sample level with defaults
220
        """
221
        service = analysis.getAnalysisService()
222
        default_conditions = service.getConditions()
223
224
        # Extract the conditions set for this analysis already
225
        existing = analysis.getConditions()
226
        existing_titles = [cond.get("title") for cond in existing]
227
228
        def is_missing(condition):
229
            return condition.get("title") not in existing_titles
230
231
        # Add only those conditions that are missing
232
        missing = filter(is_missing, default_conditions)
233
234
        # Sort them to match with same order as in service
235
        titles = [condition.get("title") for condition in default_conditions]
236
237
        def index(condition):
238
            cond_title = condition.get("title")
239
            if cond_title in titles:
240
                return titles.index(cond_title)
241
            return len(titles)
242
243
        conditions = existing + missing
244
        return sorted(conditions, key=lambda con: index(con))
245
246
    def add_analysis(self, instance, service, **kwargs):
247
        service_uid = api.get_uid(service)
248
249
        # Ensure we have suitable parameters
250
        specs = kwargs.get("specs") or {}
251
252
        # Get the hidden status for the service
253
        hidden = kwargs.get("hidden") or []
254
        hidden = filter(lambda d: d.get("uid") == service_uid, hidden)
255
        hidden = hidden and hidden[0].get("hidden") or service.getHidden()
256
257
        # Get the price for the service
258
        prices = kwargs.get("prices") or {}
259
        price = prices.get(service_uid) or service.getPrice()
260
261
        # Get the default result for the service
262
        default_result = service.getDefaultResult()
263
264
        # Gets the analysis or creates the analysis for this service
265
        # Note this returns a list, because is possible to have multiple
266
        # partitions with same analysis
267
        analyses = self.resolve_analyses(instance, service)
268
269
        # Filter out analyses in detached states
270
        # This allows to re-add an analysis that was retracted or cancelled
271
        analyses = filter(
272
            lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES,
273
            analyses)
274
275
        if not analyses:
276
            # Create the analysis
277
            analysis = create_analysis(instance, service)
278
            analyses.append(analysis)
279
280
        for analysis in analyses:
281
            # Set the hidden status
282
            analysis.setHidden(hidden)
283
284
            # Set the price of the Analysis
285
            analysis.setPrice(price)
286
287
            # Set the internal use status
288
            parent_sample = analysis.getRequest()
289
            analysis.setInternalUse(parent_sample.getInternalUse())
290
291
            # Set the default result to the analysis
292
            if not analysis.getResult() and default_result:
293
                analysis.setResult(default_result)
294
                analysis.setResultCaptureDate(None)
295
296
            # Set the result range to the analysis
297
            analysis_rr = specs.get(service_uid) or analysis.getResultsRange()
298
            analysis.setResultsRange(analysis_rr)
299
300
            # Set default (pre)conditions
301
            conditions = self.resolve_conditions(analysis)
302
            analysis.setConditions(conditions)
303
304
            analysis.reindexObject()
305
306
    def remove_analysis(self, analysis):
307
        """Removes a given analysis from the instance
308
        """
309
        # Remember assigned attachments
310
        # https://github.com/senaite/senaite.core/issues/1025
311
        attachments = analysis.getAttachment()
312
        analysis.setAttachment([])
313
314
        # If assigned to a worksheet, unassign it before deletion
315
        worksheet = analysis.getWorksheet()
316
        if worksheet:
317
            worksheet.removeAnalysis(analysis)
318
319
        # handle retest source deleted
320
        retest = analysis.getRetest()
321
        if retest:
322
            # unset reference link
323
            retest.setRetestOf(None)
324
325
        # Remove the analysis
326
        # Note the analysis might belong to a partition
327
        analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)])
328
329
        # Remove orphaned attachments
330
        for attachment in attachments:
331
            if not attachment.getLinkedAnalyses():
332
                # only delete attachments which are no further linked
333
                logger.info(
334
                    "Deleting attachment: {}".format(attachment.getId()))
335
                attachment_id = api.get_id(attachment)
336
                api.get_parent(attachment).manage_delObjects(attachment_id)
337
338
    def resolve_analyses(self, instance, service):
339
        """Resolves analyses for the service and instance
340
        It returns a list, cause for a given sample, multiple analyses for same
341
        service can exist due to the possibility of having multiple partitions
342
        """
343
        analyses = []
344
345
        # Does the analysis exists in this instance already?
346
        instance_analyses = self.get_from_instance(instance, service)
347
348
        if instance_analyses:
349
            analyses.extend(instance_analyses)
350
351
        # Does the analysis exists in an ancestor?
352
        from_ancestor = self.get_from_ancestor(instance, service)
353
        for ancestor_analysis in from_ancestor:
354
            # only move non-assigned analyses
355
            state = api.get_workflow_status_of(ancestor_analysis)
356
            if state != "unassigned":
357
                continue
358
            # Move the analysis into the partition
359
            analysis_id = api.get_id(ancestor_analysis)
360
            logger.info("Analysis {} is from an ancestor".format(analysis_id))
361
            cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id)
362
            instance.manage_pasteObjects(cp)
363
            analyses.append(instance._getOb(analysis_id))
364
365
        # Does the analysis exists in descendants?
366
        from_descendant = self.get_from_descendant(instance, service)
367
        analyses.extend(from_descendant)
368
369
        return analyses
370
371
    def get_analyses_from_descendants(self, instance):
372
        """Returns all the analyses from descendants
373
        """
374
        analyses = []
375
        for descendant in instance.getDescendants(all_descendants=True):
376
            analyses.extend(descendant.objectValues("Analysis"))
377
        return analyses
378
379
    def get_from_instance(self, instance, service):
380
        """Returns analyses for the given service from the instance
381
        """
382
        service_uid = api.get_uid(service)
383
        analyses = instance.objectValues("Analysis")
384
        # Filter those analyses with same keyword. Note that a Sample can
385
        # contain more than one analysis with same keyword because of retests
386
        return filter(lambda an: an.getServiceUID() == service_uid, analyses)
387
388
    def get_from_ancestor(self, instance, service):
389
        """Returns analyses for the given service from ancestors
390
        """
391
        ancestor = instance.getParentAnalysisRequest()
392
        if not ancestor:
393
            return []
394
395
        analyses = self.get_from_instance(ancestor, service)
396
        return analyses or self.get_from_ancestor(ancestor, service)
397
398
    def get_from_descendant(self, instance, service):
399
        """Returns analyses for the given service from descendants
400
        """
401
        analyses = []
402
        for descendant in instance.getDescendants():
403
            # Does the analysis exists in the current descendant?
404
            descendant_analyses = self.get_from_instance(descendant, service)
405
            if descendant_analyses:
406
                analyses.extend(descendant_analyses)
407
408
            # Search in descendants from current descendant
409
            from_descendant = self.get_from_descendant(descendant, service)
410
            analyses.extend(from_descendant)
411
412
        return analyses
413
414
    def _to_service(self, thing):
415
        """Convert to Analysis Service
416
417
        :param thing: UID/Catalog Brain/Object/Something
418
        :returns: Analysis Service object or None
419
        """
420
421
        # Convert UIDs to objects
422
        if api.is_uid(thing):
423
            thing = api.get_object_by_uid(thing, None)
424
425
        # Bail out if the thing is not a valid object
426
        if not api.is_object(thing):
427
            logger.warn("'{}' is not a valid object!".format(repr(thing)))
428
            return None
429
430
        # Ensure we have an object here and not a brain
431
        obj = api.get_object(thing)
432
433
        if IAnalysisService.providedBy(obj):
434
            return obj
435
436
        if IAnalysis.providedBy(obj):
437
            return obj.getAnalysisService()
438
439
        # An object, but neither an Analysis nor AnalysisService?
440
        # This should never happen.
441
        portal_type = api.get_portal_type(obj)
442
        logger.error("ARAnalysesField doesn't accept objects from {} type. "
443
                     "The object will be dismissed.".format(portal_type))
444
        return None
445
446
447
registerField(ARAnalysesField,
448
              title="Analyses",
449
              description="Manages Analyses of ARs")
450