Passed
Push — 2.x ( 380493...9aebce )
by Jordi
06:51
created

senaite.core.datamanagers.field.sample_analyses   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 405
Duplicated Lines 4.44 %

Importance

Changes 0
Metric Value
wmc 59
eloc 205
dl 18
loc 405
rs 4.08
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A SampleAnalysesFieldDataManager.remove_analysis() 0 31 5
C SampleAnalysesFieldDataManager.set() 0 61 9
A SampleAnalysesFieldDataManager.resolve_range() 0 18 4
A SampleAnalysesFieldDataManager.get_from_ancestor() 0 9 2
A SampleAnalysesFieldDataManager.resolve_conditions() 0 28 3
A SampleAnalysesFieldDataManager.get_analyses_from_descendants() 0 7 2
A SampleAnalysesFieldDataManager._to_service() 0 31 5
A SampleAnalysesFieldDataManager.get() 0 25 2
A SampleAnalysesFieldDataManager.resolve_specs() 0 21 5
A SampleAnalysesFieldDataManager.resolve_uid() 18 18 5
A SampleAnalysesFieldDataManager.get_from_descendant() 0 15 3
A SampleAnalysesFieldDataManager.get_from_instance() 0 8 2
B SampleAnalysesFieldDataManager.add_analysis() 0 59 7
A SampleAnalysesFieldDataManager.resolve_analyses() 0 32 4
A SampleAnalysesFieldDataManager.__init__() 0 4 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like senaite.core.datamanagers.field.sample_analyses often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
import itertools
4
5
from AccessControl import Unauthorized
6
from bika.lims import api
7
from bika.lims import logger
8
from bika.lims.api.security import check_permission
9
from bika.lims.interfaces import IAnalysis
10
from bika.lims.interfaces import IAnalysisService
11
from bika.lims.interfaces import ISubmitted
12
from bika.lims.utils.analysis import create_analysis
13
from senaite.core.catalog import ANALYSIS_CATALOG
14
from senaite.core.catalog import SETUP_CATALOG
15
from senaite.core.datamanagers.base import FieldDataManager
16
from senaite.core.permissions import AddAnalysis
17
18
DETACHED_STATES = ["cancelled", "retracted", "rejected"]
19
20
21
class SampleAnalysesFieldDataManager(FieldDataManager):
22
    """Data Manager for Routine Analyses
23
    """
24
    def __init__(self, context, request, field):
25
        self.context = context
26
        self.request = request
27
        self.field = field
28
29
    def get(self, **kw):
30
        """Returns a list of Analyses assigned to this AR
31
32
        Return a list of catalog brains unless `full_objects=True` is passed.
33
        Other keyword arguments are passed to senaite_catalog_analysis
34
35
        :param instance: Analysis Request object
36
        :param kwargs: Keyword arguments to inject in the search query
37
        :returns: A list of Analysis Objects/Catalog Brains
38
        """
39
        # Filter out parameters from kwargs that don't match with indexes
40
        catalog = api.get_tool(ANALYSIS_CATALOG)
41
        indexes = catalog.indexes()
42
        query = dict([(k, v) for k, v in kw.items() if k in indexes])
43
44
        query["portal_type"] = "Analysis"
45
        query["getAncestorsUIDs"] = api.get_uid(self.context)
46
        query["sort_on"] = kw.get("sort_on", "sortable_title")
47
        query["sort_order"] = kw.get("sort_order", "ascending")
48
49
        # Do the search against the catalog
50
        brains = catalog(query)
51
        if kw.get("full_objects", False):
52
            return map(api.get_object, brains)
53
        return brains
54
55
    def set(self, items, prices, specs, hidden, **kw):
56
        """Set/Assign Analyses to this AR
57
58
        :param items: List of Analysis objects/brains, AnalysisService
59
                      objects/brains and/or Analysis Service uids
60
        :type items: list
61
        :param prices: Mapping of AnalysisService UID -> price
62
        :type prices: dict
63
        :param specs: List of AnalysisService UID -> Result Range mappings
64
        :type specs: list
65
        :param hidden: List of AnalysisService UID -> Hidden mappings
66
        :type hidden: list
67
        :returns: list of new assigned Analyses
68
        """
69
70
        if items is None:
71
            items = []
72
73
        # Bail out if the items is not a list type
74
        if not isinstance(items, (list, tuple)):
75
            raise TypeError(
76
                "Items parameter must be a tuple or list, got '{}'".format(
77
                    type(items)))
78
79
        # Bail out if the AR is inactive
80
        if not api.is_active(self.context):
81
            raise Unauthorized("Inactive ARs can not be modified")
82
83
        # Bail out if the user has not the right permission
84
        if not check_permission(AddAnalysis, self.context):
85
            raise Unauthorized("You do not have the '{}' permission"
86
                               .format(AddAnalysis))
87
88
        # Convert the items to a valid list of AnalysisServices
89
        services = filter(None, map(self._to_service, items))
90
91
        # Calculate dependencies
92
        dependencies = map(lambda s: s.getServiceDependencies(), services)
93
        dependencies = list(itertools.chain.from_iterable(dependencies))
94
95
        # Merge dependencies and services
96
        services = set(services + dependencies)
97
98
        # Modify existing AR specs with new form values of selected analyses
99
        specs = self.resolve_specs(self.context, specs)
100
101
        # Add analyses
102
        params = dict(prices=prices, hidden=hidden, specs=specs)
103
        map(lambda serv: self.add_analysis(self.context, serv, **params), services)
104
105
        # Get all analyses (those from descendants included)
106
        analyses = self.context.objectValues("Analysis")
107
        analyses.extend(self.get_analyses_from_descendants(self.context))
108
109
        # Bail out those not in services list or submitted
110
        uids = map(api.get_uid, services)
111
        to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses)
112
        to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove)
113
114
        # Remove analyses
115
        map(self.remove_analysis, to_remove)
116
117
    def resolve_specs(self, instance, results_ranges):
118
        """Returns a dictionary where the key is the service_uid and the value
119
        is its results range. The dictionary is made by extending the
120
        results_ranges passed-in with the Sample's ResultsRanges (a copy of the
121
        specifications initially set)
122
        """
123
        rrs = results_ranges or []
124
125
        # Sample's Results ranges
126
        sample_rrs = instance.getResultsRange()
127
128
        # Ensure all subfields from specification are kept and missing values
129
        # for subfields are filled in accordance with the specs
130
        rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs)
131
132
        # Append those from sample that are missing in the ranges passed-in
133
        service_uids = map(lambda rr: rr["uid"], rrs)
134
        rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs))
135
136
        # Create a dict for easy access to results ranges
137
        return dict(map(lambda rr: (rr["uid"], rr), rrs))
138
139
    def resolve_range(self, result_range, sample_result_ranges):
140
        """Resolves the range by adding the uid if not present and filling the
141
        missing subfield values with those that come from the Sample
142
        specification if they are not present in the result_range passed-in
143
        """
144
        # Resolve result_range to make sure it contain uid subfield
145
        rrs = self.resolve_uid(result_range)
146
        uid = rrs.get("uid")
147
148
        for sample_rr in sample_result_ranges:
149
            if uid and sample_rr.get("uid") == uid:
150
                # Keep same fields from sample
151
                rr = sample_rr.copy()
152
                rr.update(rrs)
153
                return rr
154
155
        # Return the original with no changes
156
        return rrs
157
158 View Code Duplication
    def resolve_uid(self, result_range):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        """Resolves the uid key for the result_range passed in if it does not
160
        exist when contains a keyword
161
        """
162
        value = result_range.copy()
163
        uid = value.get("uid")
164
        if api.is_uid(uid) and uid != "0":
165
            return value
166
167
        # uid key does not exist or is not valid, try to infere from keyword
168
        keyword = value.get("keyword")
169
        if keyword:
170
            query = dict(portal_type="AnalysisService", getKeyword=keyword)
171
            brains = api.search(query, SETUP_CATALOG)
172
            if len(brains) == 1:
173
                uid = api.get_uid(brains[0])
174
        value["uid"] = uid
175
        return value
176
177
    def resolve_conditions(self, analysis):
178
        """Returns the conditions to be applied to this analysis by merging
179
        those already set at sample level with defaults
180
        """
181
        service = analysis.getAnalysisService()
182
        default_conditions = service.getConditions()
183
184
        # Extract the conditions set for this analysis already
185
        existing = analysis.getConditions()
186
        existing_titles = [cond.get("title") for cond in existing]
187
188
        def is_missing(condition):
189
            return condition.get("title") not in existing_titles
190
191
        # Add only those conditions that are missing
192
        missing = filter(is_missing, default_conditions)
193
194
        # Sort them to match with same order as in service
195
        titles = [condition.get("title") for condition in default_conditions]
196
197
        def index(condition):
198
            cond_title = condition.get("title")
199
            if cond_title in titles:
200
                return titles.index(cond_title)
201
            return len(titles)
202
203
        conditions = existing + missing
204
        return sorted(conditions, key=lambda con: index(con))
205
206
    def add_analysis(self, instance, service, **kwargs):
207
        service_uid = api.get_uid(service)
208
209
        # Ensure we have suitable parameters
210
        specs = kwargs.get("specs") or {}
211
212
        # Get the hidden status for the service
213
        hidden = kwargs.get("hidden") or []
214
        hidden = filter(lambda d: d.get("uid") == service_uid, hidden)
215
        hidden = hidden and hidden[0].get("hidden") or service.getHidden()
216
217
        # Get the price for the service
218
        prices = kwargs.get("prices") or {}
219
        price = prices.get(service_uid) or service.getPrice()
220
221
        # Get the default result for the service
222
        default_result = service.getDefaultResult()
223
224
        # Gets the analysis or creates the analysis for this service
225
        # Note this returns a list, because is possible to have multiple
226
        # partitions with same analysis
227
        analyses = self.resolve_analyses(instance, service)
228
229
        # Filter out analyses in detached states
230
        # This allows to re-add an analysis that was retracted or cancelled
231
        analyses = filter(
232
            lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES,
233
            analyses)
234
235
        if not analyses:
236
            # Create the analysis
237
            analysis = create_analysis(instance, service)
238
            analyses.append(analysis)
239
240
        for analysis in analyses:
241
            # Set the hidden status
242
            analysis.setHidden(hidden)
243
244
            # Set the price of the Analysis
245
            analysis.setPrice(price)
246
247
            # Set the internal use status
248
            parent_sample = analysis.getRequest()
249
            analysis.setInternalUse(parent_sample.getInternalUse())
250
251
            # Set the default result to the analysis
252
            if not analysis.getResult() and default_result:
253
                analysis.setResult(default_result)
254
                analysis.setResultCaptureDate(None)
255
256
            # Set the result range to the analysis
257
            analysis_rr = specs.get(service_uid) or analysis.getResultsRange()
258
            analysis.setResultsRange(analysis_rr)
259
260
            # Set default (pre)conditions
261
            conditions = self.resolve_conditions(analysis)
262
            analysis.setConditions(conditions)
263
264
            analysis.reindexObject()
265
266
    def remove_analysis(self, analysis):
267
        """Removes a given analysis from the instance
268
        """
269
        # Remember assigned attachments
270
        # https://github.com/senaite/senaite.core/issues/1025
271
        attachments = analysis.getAttachment()
272
        analysis.setAttachment([])
273
274
        # If assigned to a worksheet, unassign it before deletion
275
        worksheet = analysis.getWorksheet()
276
        if worksheet:
277
            worksheet.removeAnalysis(analysis)
278
279
        # handle retest source deleted
280
        retest = analysis.getRetest()
281
        if retest:
282
            # unset reference link
283
            retest.setRetestOf(None)
284
285
        # Remove the analysis
286
        # Note the analysis might belong to a partition
287
        analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)])
288
289
        # Remove orphaned attachments
290
        for attachment in attachments:
291
            if not attachment.getLinkedAnalyses():
292
                # only delete attachments which are no further linked
293
                logger.info(
294
                    "Deleting attachment: {}".format(attachment.getId()))
295
                attachment_id = api.get_id(attachment)
296
                api.get_parent(attachment).manage_delObjects(attachment_id)
297
298
    def resolve_analyses(self, instance, service):
299
        """Resolves analyses for the service and instance
300
        It returns a list, cause for a given sample, multiple analyses for same
301
        service can exist due to the possibility of having multiple partitions
302
        """
303
        analyses = []
304
305
        # Does the analysis exists in this instance already?
306
        instance_analyses = self.get_from_instance(instance, service)
307
308
        if instance_analyses:
309
            analyses.extend(instance_analyses)
310
311
        # Does the analysis exists in an ancestor?
312
        from_ancestor = self.get_from_ancestor(instance, service)
313
        for ancestor_analysis in from_ancestor:
314
            # only move non-assigned analyses
315
            state = api.get_workflow_status_of(ancestor_analysis)
316
            if state != "unassigned":
317
                continue
318
            # Move the analysis into the partition
319
            analysis_id = api.get_id(ancestor_analysis)
320
            logger.info("Analysis {} is from an ancestor".format(analysis_id))
321
            cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id)
322
            instance.manage_pasteObjects(cp)
323
            analyses.append(instance._getOb(analysis_id))
324
325
        # Does the analysis exists in descendants?
326
        from_descendant = self.get_from_descendant(instance, service)
327
        analyses.extend(from_descendant)
328
329
        return analyses
330
331
    def get_analyses_from_descendants(self, instance):
332
        """Returns all the analyses from descendants
333
        """
334
        analyses = []
335
        for descendant in instance.getDescendants(all_descendants=True):
336
            analyses.extend(descendant.objectValues("Analysis"))
337
        return analyses
338
339
    def get_from_instance(self, instance, service):
340
        """Returns analyses for the given service from the instance
341
        """
342
        service_uid = api.get_uid(service)
343
        analyses = instance.objectValues("Analysis")
344
        # Filter those analyses with same keyword. Note that a Sample can
345
        # contain more than one analysis with same keyword because of retests
346
        return filter(lambda an: an.getServiceUID() == service_uid, analyses)
347
348
    def get_from_ancestor(self, instance, service):
349
        """Returns analyses for the given service from ancestors
350
        """
351
        ancestor = instance.getParentAnalysisRequest()
352
        if not ancestor:
353
            return []
354
355
        analyses = self.get_from_instance(ancestor, service)
356
        return analyses or self.get_from_ancestor(ancestor, service)
357
358
    def get_from_descendant(self, instance, service):
359
        """Returns analyses for the given service from descendants
360
        """
361
        analyses = []
362
        for descendant in instance.getDescendants():
363
            # Does the analysis exists in the current descendant?
364
            descendant_analyses = self.get_from_instance(descendant, service)
365
            if descendant_analyses:
366
                analyses.extend(descendant_analyses)
367
368
            # Search in descendants from current descendant
369
            from_descendant = self.get_from_descendant(descendant, service)
370
            analyses.extend(from_descendant)
371
372
        return analyses
373
374
    def _to_service(self, thing):
375
        """Convert to Analysis Service
376
377
        :param thing: UID/Catalog Brain/Object/Something
378
        :returns: Analysis Service object or None
379
        """
380
381
        # Convert UIDs to objects
382
        if api.is_uid(thing):
383
            thing = api.get_object_by_uid(thing, None)
384
385
        # Bail out if the thing is not a valid object
386
        if not api.is_object(thing):
387
            logger.warn("'{}' is not a valid object!".format(repr(thing)))
388
            return None
389
390
        # Ensure we have an object here and not a brain
391
        obj = api.get_object(thing)
392
393
        if IAnalysisService.providedBy(obj):
394
            return obj
395
396
        if IAnalysis.providedBy(obj):
397
            return obj.getAnalysisService()
398
399
        # An object, but neither an Analysis nor AnalysisService?
400
        # This should never happen.
401
        portal_type = api.get_portal_type(obj)
402
        logger.error("ARAnalysesField doesn't accept objects from {} type. "
403
                     "The object will be dismissed.".format(portal_type))
404
        return None
405