senaite.core.datamanagers.field.sample_analyses   F
last analyzed

Complexity

Total Complexity 60

Size/Duplication

Total Lines 426
Duplicated Lines 4.23 %

Importance

Changes 0
Metric Value
wmc 60
eloc 207
dl 18
loc 426
rs 3.6
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A SampleAnalysesFieldDataManager.get() 0 25 2
A SampleAnalysesFieldDataManager.__init__() 0 4 1
A SampleAnalysesFieldDataManager.remove_analysis() 0 31 5
C SampleAnalysesFieldDataManager.set() 0 64 10
A SampleAnalysesFieldDataManager.resolve_range() 0 18 4
A SampleAnalysesFieldDataManager.get_from_ancestor() 0 9 2
A SampleAnalysesFieldDataManager.resolve_conditions() 0 28 3
A SampleAnalysesFieldDataManager.get_analyses_from_descendants() 0 7 2
A SampleAnalysesFieldDataManager._to_service() 0 31 5
A SampleAnalysesFieldDataManager.resolve_specs() 0 21 5
A SampleAnalysesFieldDataManager.resolve_uid() 18 18 5
A SampleAnalysesFieldDataManager.get_from_descendant() 0 15 3
A SampleAnalysesFieldDataManager.get_from_instance() 0 8 2
B SampleAnalysesFieldDataManager.add_analysis() 0 59 7
A SampleAnalysesFieldDataManager.resolve_analyses() 0 32 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like senaite.core.datamanagers.field.sample_analyses often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
23
from AccessControl import Unauthorized
24
from bika.lims import api
25
from bika.lims import logger
26
from bika.lims.api.security import check_permission
27
from bika.lims.interfaces import IAnalysis
28
from bika.lims.interfaces import IAnalysisService
29
from bika.lims.interfaces import ISubmitted
30
from bika.lims.utils.analysis import create_analysis
31
from senaite.core.catalog import ANALYSIS_CATALOG
32
from senaite.core.catalog import SETUP_CATALOG
33
from senaite.core.datamanagers.base import FieldDataManager
34
from senaite.core.permissions import AddAnalysis
35
36
DETACHED_STATES = ["cancelled", "retracted", "rejected"]
37
38
39
class SampleAnalysesFieldDataManager(FieldDataManager):
40
    """Data Manager for Routine Analyses
41
    """
42
    def __init__(self, context, request, field):
43
        self.context = context
44
        self.request = request
45
        self.field = field
46
47
    def get(self, **kw):
48
        """Returns a list of Analyses assigned to this AR
49
50
        Return a list of catalog brains unless `full_objects=True` is passed.
51
        Other keyword arguments are passed to senaite_catalog_analysis
52
53
        :param instance: Analysis Request object
54
        :param kwargs: Keyword arguments to inject in the search query
55
        :returns: A list of Analysis Objects/Catalog Brains
56
        """
57
        # Filter out parameters from kwargs that don't match with indexes
58
        catalog = api.get_tool(ANALYSIS_CATALOG)
59
        indexes = catalog.indexes()
60
        query = dict([(k, v) for k, v in kw.items() if k in indexes])
61
62
        query["portal_type"] = "Analysis"
63
        query["getAncestorsUIDs"] = api.get_uid(self.context)
64
        query["sort_on"] = kw.get("sort_on", "sortable_title")
65
        query["sort_order"] = kw.get("sort_order", "ascending")
66
67
        # Do the search against the catalog
68
        brains = catalog(query)
69
        if kw.get("full_objects", False):
70
            return map(api.get_object, brains)
71
        return list(brains)
72
73
    def set(self, items, prices, specs, hidden, **kw):
74
        """Set/Assign Analyses to this AR
75
76
        :param items: List of Analysis objects/brains, AnalysisService
77
                      objects/brains and/or Analysis Service uids
78
        :type items: list
79
        :param prices: Mapping of AnalysisService UID -> price
80
        :type prices: dict
81
        :param specs: List of AnalysisService UID -> Result Range mappings
82
        :type specs: list
83
        :param hidden: List of AnalysisService UID -> Hidden mappings
84
        :type hidden: list
85
        :returns: list of new assigned Analyses
86
        """
87
88
        if items is None:
89
            items = []
90
91
        # Bail out if the items is not a list type
92
        if not isinstance(items, (list, tuple)):
93
            raise TypeError(
94
                "Items parameter must be a tuple or list, got '{}'".format(
95
                    type(items)))
96
97
        # Bail out if the AR is inactive
98
        if not api.is_active(self.context):
99
            raise Unauthorized("Inactive ARs can not be modified")
100
101
        # Bail out if the user has not the right permission
102
        if not check_permission(AddAnalysis, self.context):
103
            raise Unauthorized("You do not have the '{}' permission"
104
                               .format(AddAnalysis))
105
106
        # Convert the items to a valid list of AnalysisServices
107
        services = filter(None, map(self._to_service, items))
108
109
        # Calculate dependencies
110
        dependencies = map(lambda s: s.getServiceDependencies(), services)
111
        dependencies = list(itertools.chain.from_iterable(dependencies))
112
113
        # Merge dependencies and services
114
        services = set(services + dependencies)
115
116
        # Modify existing AR specs with new form values of selected analyses
117
        specs = self.resolve_specs(self.context, specs)
118
119
        # Add analyses
120
        params = dict(prices=prices, hidden=hidden, specs=specs)
121
        map(lambda s: self.add_analysis(self.context, s, **params), services)
122
123
        # Get all analyses (those from descendants included)
124
        analyses = self.context.objectValues("Analysis")
125
        analyses.extend(self.get_analyses_from_descendants(self.context))
126
127
        uids = map(api.get_uid, services)
128
        to_remove = filter(lambda an: an.getServiceUID() not in uids, analyses)
129
        # Retain submitted analyses
130
        to_remove = filter(lambda an: not ISubmitted.providedBy(an), to_remove)
131
        # Retain analyses from detached states
132
        to_remove = filter(lambda an: api.get_review_status(an)
133
                           not in DETACHED_STATES, to_remove)
134
135
        # Remove analyses
136
        map(self.remove_analysis, to_remove)
137
138
    def resolve_specs(self, instance, results_ranges):
139
        """Returns a dictionary where the key is the service_uid and the value
140
        is its results range. The dictionary is made by extending the
141
        results_ranges passed-in with the Sample's ResultsRanges (a copy of the
142
        specifications initially set)
143
        """
144
        rrs = results_ranges or []
145
146
        # Sample's Results ranges
147
        sample_rrs = instance.getResultsRange()
148
149
        # Ensure all subfields from specification are kept and missing values
150
        # for subfields are filled in accordance with the specs
151
        rrs = map(lambda rr: self.resolve_range(rr, sample_rrs), rrs)
152
153
        # Append those from sample that are missing in the ranges passed-in
154
        service_uids = map(lambda rr: rr["uid"], rrs)
155
        rrs.extend(filter(lambda rr: rr["uid"] not in service_uids, sample_rrs))
156
157
        # Create a dict for easy access to results ranges
158
        return dict(map(lambda rr: (rr["uid"], rr), rrs))
159
160
    def resolve_range(self, result_range, sample_result_ranges):
161
        """Resolves the range by adding the uid if not present and filling the
162
        missing subfield values with those that come from the Sample
163
        specification if they are not present in the result_range passed-in
164
        """
165
        # Resolve result_range to make sure it contain uid subfield
166
        rrs = self.resolve_uid(result_range)
167
        uid = rrs.get("uid")
168
169
        for sample_rr in sample_result_ranges:
170
            if uid and sample_rr.get("uid") == uid:
171
                # Keep same fields from sample
172
                rr = sample_rr.copy()
173
                rr.update(rrs)
174
                return rr
175
176
        # Return the original with no changes
177
        return rrs
178
179 View Code Duplication
    def resolve_uid(self, result_range):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
180
        """Resolves the uid key for the result_range passed in if it does not
181
        exist when contains a keyword
182
        """
183
        value = result_range.copy()
184
        uid = value.get("uid")
185
        if api.is_uid(uid) and uid != "0":
186
            return value
187
188
        # uid key does not exist or is not valid, try to infere from keyword
189
        keyword = value.get("keyword")
190
        if keyword:
191
            query = dict(portal_type="AnalysisService", getKeyword=keyword)
192
            brains = api.search(query, SETUP_CATALOG)
193
            if len(brains) == 1:
194
                uid = api.get_uid(brains[0])
195
        value["uid"] = uid
196
        return value
197
198
    def resolve_conditions(self, analysis):
199
        """Returns the conditions to be applied to this analysis by merging
200
        those already set at sample level with defaults
201
        """
202
        service = analysis.getAnalysisService()
203
        default_conditions = service.getConditions()
204
205
        # Extract the conditions set for this analysis already
206
        existing = analysis.getConditions()
207
        existing_titles = [cond.get("title") for cond in existing]
208
209
        def is_missing(condition):
210
            return condition.get("title") not in existing_titles
211
212
        # Add only those conditions that are missing
213
        missing = filter(is_missing, default_conditions)
214
215
        # Sort them to match with same order as in service
216
        titles = [condition.get("title") for condition in default_conditions]
217
218
        def index(condition):
219
            cond_title = condition.get("title")
220
            if cond_title in titles:
221
                return titles.index(cond_title)
222
            return len(titles)
223
224
        conditions = existing + missing
225
        return sorted(conditions, key=lambda con: index(con))
226
227
    def add_analysis(self, instance, service, **kwargs):
228
        service_uid = api.get_uid(service)
229
230
        # Ensure we have suitable parameters
231
        specs = kwargs.get("specs") or {}
232
233
        # Get the hidden status for the service
234
        hidden = kwargs.get("hidden") or []
235
        hidden = filter(lambda d: d.get("uid") == service_uid, hidden)
236
        hidden = hidden and hidden[0].get("hidden") or service.getHidden()
237
238
        # Get the price for the service
239
        prices = kwargs.get("prices") or {}
240
        price = prices.get(service_uid) or service.getPrice()
241
242
        # Get the default result for the service
243
        default_result = service.getDefaultResult()
244
245
        # Gets the analysis or creates the analysis for this service
246
        # Note this returns a list, because is possible to have multiple
247
        # partitions with same analysis
248
        analyses = self.resolve_analyses(instance, service)
249
250
        # Filter out analyses in detached states
251
        # This allows to re-add an analysis that was retracted or cancelled
252
        analyses = filter(
253
            lambda an: api.get_workflow_status_of(an) not in DETACHED_STATES,
254
            analyses)
255
256
        if not analyses:
257
            # Create the analysis
258
            analysis = create_analysis(instance, service)
259
            analyses.append(analysis)
260
261
        for analysis in analyses:
262
            # Set the hidden status
263
            analysis.setHidden(hidden)
264
265
            # Set the price of the Analysis
266
            analysis.setPrice(price)
267
268
            # Set the internal use status
269
            parent_sample = analysis.getRequest()
270
            analysis.setInternalUse(parent_sample.getInternalUse())
271
272
            # Set the default result to the analysis
273
            if not analysis.getResult() and default_result:
274
                analysis.setResult(default_result)
275
                analysis.setResultCaptureDate(None)
276
277
            # Set the result range to the analysis
278
            analysis_rr = specs.get(service_uid) or analysis.getResultsRange()
279
            analysis.setResultsRange(analysis_rr)
280
281
            # Set default (pre)conditions
282
            conditions = self.resolve_conditions(analysis)
283
            analysis.setConditions(conditions)
284
285
            analysis.reindexObject()
286
287
    def remove_analysis(self, analysis):
288
        """Removes a given analysis from the instance
289
        """
290
        # Remember assigned attachments
291
        # https://github.com/senaite/senaite.core/issues/1025
292
        attachments = analysis.getAttachment()
293
        analysis.setAttachment([])
294
295
        # If assigned to a worksheet, unassign it before deletion
296
        worksheet = analysis.getWorksheet()
297
        if worksheet:
298
            worksheet.removeAnalysis(analysis)
299
300
        # handle retest source deleted
301
        retest = analysis.getRetest()
302
        if retest:
303
            # unset reference link
304
            retest.setRetestOf(None)
305
306
        # Remove the analysis
307
        # Note the analysis might belong to a partition
308
        analysis.aq_parent.manage_delObjects(ids=[api.get_id(analysis)])
309
310
        # Remove orphaned attachments
311
        for attachment in attachments:
312
            if not attachment.getLinkedAnalyses():
313
                # only delete attachments which are no further linked
314
                logger.info(
315
                    "Deleting attachment: {}".format(attachment.getId()))
316
                attachment_id = api.get_id(attachment)
317
                api.get_parent(attachment).manage_delObjects(attachment_id)
318
319
    def resolve_analyses(self, instance, service):
320
        """Resolves analyses for the service and instance
321
        It returns a list, cause for a given sample, multiple analyses for same
322
        service can exist due to the possibility of having multiple partitions
323
        """
324
        analyses = []
325
326
        # Does the analysis exists in this instance already?
327
        instance_analyses = self.get_from_instance(instance, service)
328
329
        if instance_analyses:
330
            analyses.extend(instance_analyses)
331
332
        # Does the analysis exists in an ancestor?
333
        from_ancestor = self.get_from_ancestor(instance, service)
334
        for ancestor_analysis in from_ancestor:
335
            # only move non-assigned analyses
336
            state = api.get_workflow_status_of(ancestor_analysis)
337
            if state != "unassigned":
338
                continue
339
            # Move the analysis into the partition
340
            analysis_id = api.get_id(ancestor_analysis)
341
            logger.info("Analysis {} is from an ancestor".format(analysis_id))
342
            cp = ancestor_analysis.aq_parent.manage_cutObjects(analysis_id)
343
            instance.manage_pasteObjects(cp)
344
            analyses.append(instance._getOb(analysis_id))
345
346
        # Does the analysis exists in descendants?
347
        from_descendant = self.get_from_descendant(instance, service)
348
        analyses.extend(from_descendant)
349
350
        return analyses
351
352
    def get_analyses_from_descendants(self, instance):
353
        """Returns all the analyses from descendants
354
        """
355
        analyses = []
356
        for descendant in instance.getDescendants(all_descendants=True):
357
            analyses.extend(descendant.objectValues("Analysis"))
358
        return analyses
359
360
    def get_from_instance(self, instance, service):
361
        """Returns analyses for the given service from the instance
362
        """
363
        service_uid = api.get_uid(service)
364
        analyses = instance.objectValues("Analysis")
365
        # Filter those analyses with same keyword. Note that a Sample can
366
        # contain more than one analysis with same keyword because of retests
367
        return filter(lambda an: an.getServiceUID() == service_uid, analyses)
368
369
    def get_from_ancestor(self, instance, service):
370
        """Returns analyses for the given service from ancestors
371
        """
372
        ancestor = instance.getParentAnalysisRequest()
373
        if not ancestor:
374
            return []
375
376
        analyses = self.get_from_instance(ancestor, service)
377
        return analyses or self.get_from_ancestor(ancestor, service)
378
379
    def get_from_descendant(self, instance, service):
380
        """Returns analyses for the given service from descendants
381
        """
382
        analyses = []
383
        for descendant in instance.getDescendants():
384
            # Does the analysis exists in the current descendant?
385
            descendant_analyses = self.get_from_instance(descendant, service)
386
            if descendant_analyses:
387
                analyses.extend(descendant_analyses)
388
389
            # Search in descendants from current descendant
390
            from_descendant = self.get_from_descendant(descendant, service)
391
            analyses.extend(from_descendant)
392
393
        return analyses
394
395
    def _to_service(self, thing):
396
        """Convert to Analysis Service
397
398
        :param thing: UID/Catalog Brain/Object/Something
399
        :returns: Analysis Service object or None
400
        """
401
402
        # Convert UIDs to objects
403
        if api.is_uid(thing):
404
            thing = api.get_object_by_uid(thing, None)
405
406
        # Bail out if the thing is not a valid object
407
        if not api.is_object(thing):
408
            logger.warn("'{}' is not a valid object!".format(repr(thing)))
409
            return None
410
411
        # Ensure we have an object here and not a brain
412
        obj = api.get_object(thing)
413
414
        if IAnalysisService.providedBy(obj):
415
            return obj
416
417
        if IAnalysis.providedBy(obj):
418
            return obj.getAnalysisService()
419
420
        # An object, but neither an Analysis nor AnalysisService?
421
        # This should never happen.
422
        portal_type = api.get_portal_type(obj)
423
        logger.error("ARAnalysesField doesn't accept objects from {} type. "
424
                     "The object will be dismissed.".format(portal_type))
425
        return None
426