Passed
Push — 2.x ( 682a59...d1c46a )
by Ramon
06:01 queued 24s
created

bika.lims.browser.fields.aranalysesfield   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 463
Duplicated Lines 3.89 %

Importance

Changes 0
Metric Value
wmc 60
eloc 226
dl 18
loc 463
rs 3.6
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
C ARAnalysesField.set() 0 60 9
A ARAnalysesField.get() 0 25 2
A ARAnalysesField.resolve_uid() 18 18 5
A ARAnalysesField.resolve_specs() 0 21 5
A ARAnalysesField.resolve_range() 0 18 4
A ARAnalysesField.remove_analysis() 0 31 5
A ARAnalysesField.get_from_ancestor() 0 9 2
A ARAnalysesField._to_service() 0 31 5
A ARAnalysesField.get_from_descendant() 0 15 3
A ARAnalysesField.get_analyses_from_descendants() 0 7 2
A ARAnalysesField.resolve_analyses() 0 32 4
B ARAnalysesField.add_analysis() 0 61 7
A ARAnalysesField.generate_analysis_id() 0 10 2
A ARAnalysesField.get_from_instance() 0 8 2
A ARAnalysesField.resolve_conditions() 0 28 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like bika.lims.browser.fields.aranalysesfield often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
23
from AccessControl import ClassSecurityInfo
24
from AccessControl import Unauthorized
25
from bika.lims import api
26
from bika.lims import logger
27
from bika.lims.api.security import check_permission
28
from bika.lims.interfaces import IAnalysis
29
from bika.lims.interfaces import IAnalysisService
30
from bika.lims.interfaces import IARAnalysesField
31
from bika.lims.interfaces import ISubmitted
32
from bika.lims.permissions import AddAnalysis
33
from bika.lims.utils.analysis import create_analysis
34
from Products.Archetypes.public import Field
35
from Products.Archetypes.public import ObjectField
36
from Products.Archetypes.Registry import registerField
37
from senaite.core.catalog import ANALYSIS_CATALOG
38
from senaite.core.catalog import SETUP_CATALOG
39
from zope.interface import implements
40
41
DETACHED_STATES = ["cancelled", "retracted", "rejected"]
42
43
44
"""Field to manage Analyses on ARs
45
46
Please see the assigned doctest at tests/doctests/ARAnalysesField.rst
47
48
Run this test from the buildout directory:
49
50
    bin/test test_textual_doctests -t ARAnalysesField
51
"""
52
53
54
class ARAnalysesField(ObjectField):
55
    """A field that stores Analyses instances
56
    """
57
    implements(IARAnalysesField)
58
59
    security = ClassSecurityInfo()
60
    _properties = Field._properties.copy()
61
    _properties.update({
62
        "type": "analyses",
63
        "default": None,
64
    })
65
66
    security.declarePrivate('get')
67
68
    def get(self, instance, **kwargs):
69
        """Returns a list of Analyses assigned to this AR
70
71
        Return a list of catalog brains unless `full_objects=True` is passed.
72
        Other keyword arguments are passed to senaite_catalog_analysis
73
74
        :param instance: Analysis Request object
75
        :param kwargs: Keyword arguments to inject in the search query
76
        :returns: A list of Analysis Objects/Catalog Brains
77
        """
78
        # Do we need to return objects or brains
79
        full_objects = kwargs.get("full_objects", False)
80
81
        # Bail out parameters from kwargs that don't match with indexes
82
        catalog = api.get_tool(ANALYSIS_CATALOG)
83
        indexes = catalog.indexes()
84
        query = dict([(k, v) for k, v in kwargs.items() if k in indexes])
85
86
        # Do the search against the catalog
87
        query["portal_type"] = "Analysis"
88
        query["getAncestorsUIDs"] = api.get_uid(instance)
89
        brains = catalog(query)
90
        if full_objects:
91
            return map(api.get_object, brains)
92
        return brains
93
94
    security.declarePrivate('set')
95
96
    def set(self, instance, items, prices=None, specs=None, hidden=None, **kw):
97
        """Set/Assign Analyses to this AR
98
99
        :param items: List of Analysis objects/brains, AnalysisService
100
                      objects/brains and/or Analysis Service uids
101
        :type items: list
102
        :param prices: Mapping of AnalysisService UID -> price
103
        :type prices: dict
104
        :param specs: List of AnalysisService UID -> Result Range mappings
105
        :type specs: list
106
        :param hidden: List of AnalysisService UID -> Hidden mappings
107
        :type hidden: list
108
        :returns: list of new assigned Analyses
109
        """
110
        if items is None:
111
            items = []
112
113
        # Bail out if the items is not a list type
114
        if not isinstance(items, (list, tuple)):
115
            raise TypeError(
116
                "Items parameter must be a tuple or list, got '{}'".format(
117
                    type(items)))
118
119
        # Bail out if the AR is inactive
120
        if not api.is_active(instance):
121
            raise Unauthorized("Inactive ARs can not be modified")
122
123
        # Bail out if the user has not the right permission
124
        if not check_permission(AddAnalysis, instance):
125
            raise Unauthorized("You do not have the '{}' permission"
126
                               .format(AddAnalysis))
127
128
        # Convert the items to a valid list of AnalysisServices
129
        services = filter(None, map(self._to_service, items))
130
131
        # Calculate dependencies
132
        dependencies = map(lambda s: s.getServiceDependencies(), services)
133
        dependencies = list(itertools.chain.from_iterable(dependencies))
134
135
        # Merge dependencies and services
136
        services = set(services + dependencies)
137
138
        # Modify existing AR specs with new form values of selected analyses
139
        specs = self.resolve_specs(instance, specs)
140
141
        # Add analyses
142
        params = dict(prices=prices, hidden=hidden, specs=specs)
143
        map(lambda serv: self.add_analysis(instance, serv, **params), services)
144
145
        # Get all analyses (those from descendants included)
146
        analyses = instance.objectValues("Analysis")
147
        analyses.extend(self.get_analyses_from_descendants(instance))
148
149
        # Bail out those not in services list or submitted
150
        uids = map(api.get_uid, services)
151
        to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses)
152
        to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove)
153
154
        # Remove analyses
155
        map(self.remove_analysis, to_remove)
156
157
    def resolve_specs(self, instance, results_ranges):
158
        """Returns a dictionary where the key is the service_uid and the value
159
        is its results range. The dictionary is made by extending the
160
        results_ranges passed-in with the Sample's ResultsRanges (a copy of the
161
        specifications initially set)
162
        """
163
        rrs = results_ranges or []
164
165
        # Sample's Results ranges
166
        sample_rrs = instance.getResultsRange()
167
168
        # Ensure all subfields from specification are kept and missing values
169
        # for subfields are filled in accordance with the specs
170
        rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs)
171
172
        # Append those from sample that are missing in the ranges passed-in
173
        service_uids = map(lambda rr: rr["uid"], rrs)
174
        rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs))
175
176
        # Create a dict for easy access to results ranges
177
        return dict(map(lambda rr: (rr["uid"], rr), rrs))
178
179
    def resolve_range(self, result_range, sample_result_ranges):
180
        """Resolves the range by adding the uid if not present and filling the
181
        missing subfield values with those that come from the Sample
182
        specification if they are not present in the result_range passed-in
183
        """
184
        # Resolve result_range to make sure it contain uid subfield
185
        rrs = self.resolve_uid(result_range)
186
        uid = rrs.get("uid")
187
188
        for sample_rr in sample_result_ranges:
189
            if uid and sample_rr.get("uid") == uid:
190
                # Keep same fields from sample
191
                rr = sample_rr.copy()
192
                rr.update(rrs)
193
                return rr
194
195
        # Return the original with no changes
196
        return rrs
197
198 View Code Duplication
    def resolve_uid(self, result_range):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
199
        """Resolves the uid key for the result_range passed in if it does not
200
        exist when contains a keyword
201
        """
202
        value = result_range.copy()
203
        uid = value.get("uid")
204
        if api.is_uid(uid) and uid != "0":
205
            return value
206
207
        # uid key does not exist or is not valid, try to infere from keyword
208
        keyword = value.get("keyword")
209
        if keyword:
210
            query = dict(portal_type="AnalysisService", getKeyword=keyword)
211
            brains = api.search(query, SETUP_CATALOG)
212
            if len(brains) == 1:
213
                uid = api.get_uid(brains[0])
214
        value["uid"] = uid
215
        return value
216
217
    def resolve_conditions(self, analysis):
218
        """Returns the conditions to be applied to this analysis by merging
219
        those already set at sample level with defaults
220
        """
221
        service = analysis.getAnalysisService()
222
        default_conditions = service.getConditions()
223
224
        # Extract the conditions set for this analysis already
225
        existing = analysis.getConditions()
226
        existing_titles = [cond.get("title") for cond in existing]
227
228
        def is_missing(condition):
229
            return condition.get("title") not in existing_titles
230
231
        # Add only those conditions that are missing
232
        missing = filter(is_missing, default_conditions)
233
234
        # Sort them to match with same order as in service
235
        titles = [condition.get("title") for condition in default_conditions]
236
237
        def index(condition):
238
            cond_title = condition.get("title")
239
            if cond_title in titles:
240
                return titles.index(cond_title)
241
            return len(titles)
242
243
        conditions = existing + missing
244
        return sorted(conditions, key=lambda con: index(con))
245
246
    def add_analysis(self, instance, service, **kwargs):
247
        service_uid = api.get_uid(service)
248
249
        # Ensure we have suitable parameters
250
        specs = kwargs.get("specs") or {}
251
252
        # Get the hidden status for the service
253
        hidden = kwargs.get("hidden") or []
254
        hidden = filter(lambda d: d.get("uid") == service_uid, hidden)
255
        hidden = hidden and hidden[0].get("hidden") or service.getHidden()
256
257
        # Get the price for the service
258
        prices = kwargs.get("prices") or {}
259
        price = prices.get(service_uid) or service.getPrice()
260
261
        # Get the default result for the service
262
        default_result = service.getDefaultResult()
263
264
        # Gets the analysis or creates the analysis for this service
265
        # Note this returns a list, because is possible to have multiple
266
        # partitions with same analysis
267
        analyses = self.resolve_analyses(instance, service)
268
269
        # Filter out analyses in detached states
270
        # This allows to re-add an analysis that was retracted or cancelled
271
        analyses = filter(
272
            lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES,
273
            analyses)
274
275
        if not analyses:
276
            # Create the analysis
277
            new_id = self.generate_analysis_id(instance, service)
278
            logger.info("Creating new analysis '{}'".format(new_id))
279
            analysis = create_analysis(instance, service, id=new_id)
280
            analyses.append(analysis)
281
282
        for analysis in analyses:
283
            # Set the hidden status
284
            analysis.setHidden(hidden)
285
286
            # Set the price of the Analysis
287
            analysis.setPrice(price)
288
289
            # Set the internal use status
290
            parent_sample = analysis.getRequest()
291
            analysis.setInternalUse(parent_sample.getInternalUse())
292
293
            # Set the default result to the analysis
294
            if not analysis.getResult() and default_result:
295
                analysis.setResult(default_result)
296
                analysis.setResultCaptureDate(None)
297
298
            # Set the result range to the analysis
299
            analysis_rr = specs.get(service_uid) or analysis.getResultsRange()
300
            analysis.setResultsRange(analysis_rr)
301
302
            # Set default (pre)conditions
303
            conditions = self.resolve_conditions(analysis)
304
            analysis.setConditions(conditions)
305
306
            analysis.reindexObject()
307
308
    def generate_analysis_id(self, instance, service):
309
        """Generate a new analysis ID
310
        """
311
        count = 1
312
        keyword = service.getKeyword()
313
        new_id = keyword
314
        while new_id in instance.objectIds():
315
            new_id = "{}-{}".format(keyword, count)
316
            count += 1
317
        return new_id
318
319
    def remove_analysis(self, analysis):
320
        """Removes a given analysis from the instance
321
        """
322
        # Remember assigned attachments
323
        # https://github.com/senaite/senaite.core/issues/1025
324
        attachments = analysis.getAttachment()
325
        analysis.setAttachment([])
326
327
        # If assigned to a worksheet, unassign it before deletion
328
        worksheet = analysis.getWorksheet()
329
        if worksheet:
330
            worksheet.removeAnalysis(analysis)
331
332
        # handle retest source deleted
333
        retest = analysis.getRetest()
334
        if retest:
335
            # unset reference link
336
            retest.setRetestOf(None)
337
338
        # Remove the analysis
339
        # Note the analysis might belong to a partition
340
        analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)])
341
342
        # Remove orphaned attachments
343
        for attachment in attachments:
344
            if not attachment.getLinkedAnalyses():
345
                # only delete attachments which are no further linked
346
                logger.info(
347
                    "Deleting attachment: {}".format(attachment.getId()))
348
                attachment_id = api.get_id(attachment)
349
                api.get_parent(attachment).manage_delObjects(attachment_id)
350
351
    def resolve_analyses(self, instance, service):
352
        """Resolves analyses for the service and instance
353
        It returns a list, cause for a given sample, multiple analyses for same
354
        service can exist due to the possibility of having multiple partitions
355
        """
356
        analyses = []
357
358
        # Does the analysis exists in this instance already?
359
        instance_analyses = self.get_from_instance(instance, service)
360
361
        if instance_analyses:
362
            analyses.extend(instance_analyses)
363
364
        # Does the analysis exists in an ancestor?
365
        from_ancestor = self.get_from_ancestor(instance, service)
366
        for ancestor_analysis in from_ancestor:
367
            # only move non-assigned analyses
368
            state = api.get_workflow_status_of(ancestor_analysis)
369
            if state != "unassigned":
370
                continue
371
            # Move the analysis into the partition
372
            analysis_id = api.get_id(ancestor_analysis)
373
            logger.info("Analysis {} is from an ancestor".format(analysis_id))
374
            cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id)
375
            instance.manage_pasteObjects(cp)
376
            analyses.append(instance._getOb(analysis_id))
377
378
        # Does the analysis exists in descendants?
379
        from_descendant = self.get_from_descendant(instance, service)
380
        analyses.extend(from_descendant)
381
382
        return analyses
383
384
    def get_analyses_from_descendants(self, instance):
385
        """Returns all the analyses from descendants
386
        """
387
        analyses = []
388
        for descendant in instance.getDescendants(all_descendants=True):
389
            analyses.extend(descendant.objectValues("Analysis"))
390
        return analyses
391
392
    def get_from_instance(self, instance, service):
393
        """Returns analyses for the given service from the instance
394
        """
395
        service_uid = api.get_uid(service)
396
        analyses = instance.objectValues("Analysis")
397
        # Filter those analyses with same keyword. Note that a Sample can
398
        # contain more than one analysis with same keyword because of retests
399
        return filter(lambda an: an.getServiceUID() == service_uid, analyses)
400
401
    def get_from_ancestor(self, instance, service):
402
        """Returns analyses for the given service from ancestors
403
        """
404
        ancestor = instance.getParentAnalysisRequest()
405
        if not ancestor:
406
            return []
407
408
        analyses = self.get_from_instance(ancestor, service)
409
        return analyses or self.get_from_ancestor(ancestor, service)
410
411
    def get_from_descendant(self, instance, service):
412
        """Returns analyses for the given service from descendants
413
        """
414
        analyses = []
415
        for descendant in instance.getDescendants():
416
            # Does the analysis exists in the current descendant?
417
            descendant_analyses = self.get_from_instance(descendant, service)
418
            if descendant_analyses:
419
                analyses.extend(descendant_analyses)
420
421
            # Search in descendants from current descendant
422
            from_descendant = self.get_from_descendant(descendant, service)
423
            analyses.extend(from_descendant)
424
425
        return analyses
426
427
    def _to_service(self, thing):
428
        """Convert to Analysis Service
429
430
        :param thing: UID/Catalog Brain/Object/Something
431
        :returns: Analysis Service object or None
432
        """
433
434
        # Convert UIDs to objects
435
        if api.is_uid(thing):
436
            thing = api.get_object_by_uid(thing, None)
437
438
        # Bail out if the thing is not a valid object
439
        if not api.is_object(thing):
440
            logger.warn("'{}' is not a valid object!".format(repr(thing)))
441
            return None
442
443
        # Ensure we have an object here and not a brain
444
        obj = api.get_object(thing)
445
446
        if IAnalysisService.providedBy(obj):
447
            return obj
448
449
        if IAnalysis.providedBy(obj):
450
            return obj.getAnalysisService()
451
452
        # An object, but neither an Analysis nor AnalysisService?
453
        # This should never happen.
454
        portal_type = api.get_portal_type(obj)
455
        logger.error("ARAnalysesField doesn't accept objects from {} type. "
456
                     "The object will be dismissed.".format(portal_type))
457
        return None
458
459
460
registerField(ARAnalysesField,
461
              title="Analyses",
462
              description="Manages Analyses of ARs")
463