Passed
Push — 2.x ( b119db...964d8f )
by Jordi
06:55
created

senaite.core.browser.samples.partition_magic   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 351
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 49
eloc 223
dl 0
loc 351
rs 8.48
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A PartitionMagicView.get_preservations() 0 9 1
A PartitionMagicView.get_preservation_data() 0 6 2
A PartitionMagicView.get_base_info() 0 15 1
A PartitionMagicView.get_sampletypes() 0 11 1
A PartitionMagicView.__init__() 0 6 1
A PartitionMagicView.get_object_by_uid() 0 8 2
A PartitionMagicView.get_analysis_data_for() 0 13 2
A PartitionMagicView.push_primary_analyses_for_removal() 0 6 1
C PartitionMagicView.get_template_data_for() 0 60 8
A PartitionMagicView.get_objects() 0 12 3
A PartitionMagicView.get_sampletype_data() 0 6 2
C PartitionMagicView.__call__() 0 74 11
A PartitionMagicView.redirect() 0 8 3
A PartitionMagicView.get_number_of_partitions_for() 0 19 4
A PartitionMagicView.add_status_message() 0 4 1
A PartitionMagicView.get_container_data() 0 6 2
A PartitionMagicView.get_containers() 0 9 1
A PartitionMagicView.to_super_model() 0 6 1
A PartitionMagicView.get_ar_data() 0 12 2

How to fix   Complexity   

Complexity

Complex classes like senaite.core.browser.samples.partition_magic often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import six
22
23
from collections import OrderedDict
24
from collections import defaultdict
25
26
from Products.Five.browser import BrowserView
27
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
28
from bika.lims import api
29
from bika.lims import bikaMessageFactory as _
30
from bika.lims import logger
31
from bika.lims.decorators import returns_super_model
32
from bika.lims.utils.analysisrequest import create_partition
33
34
DEFAULT_NUMBER_OF_PARTITIONS = 0
35
36
37
class PartitionMagicView(BrowserView):
38
    """Manage Partitions of primary ARs
39
    """
40
    template = ViewPageTemplateFile("templates/partition_magic.pt")
41
42
    def __init__(self, context, request):
43
        super(PartitionMagicView, self).__init__(context, request)
44
        self.context = context
45
        self.request = request
46
        self.back_url = self.context.absolute_url()
47
        self.analyses_to_remove = dict()
48
49
    def __call__(self):
50
        form = self.request.form
51
52
        # Form submit toggle
53
        form_submitted = form.get("submitted", False)
54
55
        # Buttons
56
        form_preview = form.get("button_preview", False)
57
        form_create = form.get("button_create", False)
58
        form_cancel = form.get("button_cancel", False)
59
60
        objs = self.get_objects()
61
62
        # No ARs selected
63
        if not objs:
64
            return self.redirect(message=_("No items selected"),
65
                                 level="warning")
66
67
        # Handle preview
68
        if form_submitted and form_preview:
69
            logger.info("*** PREVIEW ***")
70
71
        # Handle create
72
        if form_submitted and form_create:
73
            logger.info("*** CREATE PARTITIONS ***")
74
75
            partitions = []
76
77
            # create the partitions
78
            for partition in form.get("partitions", []):
79
                primary_uid = partition.get("primary_uid")
80
                sampletype_uid = partition.get("sampletype_uid")
81
                container_uid = partition.get("container_uid")
82
                preservation_uid = partition.get("preservation_uid")
83
                internal_use = partition.get("internal_use")
84
                if not primary_uid:
85
                    continue
86
87
                # The creation of partitions w/o analyses is allowed. Maybe the
88
                # user wants to add the analyses later manually or wants to keep
89
                # this partition stored in a freezer for some time
90
                analyses_uids = partition.get("analyses", [])
91
                partition = create_partition(
92
                    request=self.request,
93
                    analysis_request=primary_uid,
94
                    sample_type=sampletype_uid,
95
                    container=container_uid,
96
                    preservation=preservation_uid,
97
                    analyses=analyses_uids,
98
                    internal_use=internal_use,
99
                )
100
                partitions.append(partition)
101
102
                # Remove analyses from primary once all partitions are created
103
                primary = api.get_object(primary_uid)
104
                self.push_primary_analyses_for_removal(primary, analyses_uids)
105
106
                logger.info("Successfully created partition: {}".format(
107
                    api.get_path(partition)))
108
109
            if not partitions:
110
                # If no partitions were created, show a warning message
111
                return self.redirect(message=_("No partitions were created"))
112
113
            message = _("Created {} partitions: {}".format(
114
                len(partitions), ", ".join(map(api.get_title, partitions))))
115
            return self.redirect(message=message)
116
117
        # Handle cancel
118
        if form_submitted and form_cancel:
119
            logger.info("*** CANCEL ***")
120
            return self.redirect(message=_("Partitioning canceled"))
121
122
        return self.template()
123
124
    def push_primary_analyses_for_removal(self, analysis_request, analyses):
125
        """Stores the analyses to be removed after partitions creation
126
        """
127
        to_remove = self.analyses_to_remove.get(analysis_request, [])
128
        to_remove.extend(analyses)
129
        self.analyses_to_remove[analysis_request] = list(set(to_remove))
130
131
    def get_ar_data(self):
132
        """Returns a list of AR data
133
        """
134
        for obj in self.get_objects():
135
            info = self.get_base_info(obj)
136
            info.update({
137
                "analyses": self.get_analysis_data_for(obj),
138
                "sampletype": self.get_base_info(obj.getSampleType()),
139
                "number_of_partitions": self.get_number_of_partitions_for(obj),
140
                "template": self.get_template_data_for(obj),
141
            })
142
            yield info
143
144
    def get_sampletype_data(self):
145
        """Returns a list of SampleType data
146
        """
147
        for obj in self.get_sampletypes():
148
            info = self.get_base_info(obj)
149
            yield info
150
151
    def get_container_data(self):
152
        """Returns a list of Container data
153
        """
154
        for obj in self.get_containers():
155
            info = self.get_base_info(obj)
156
            yield info
157
158
    def get_preservation_data(self):
159
        """Returns a list of Preservation data
160
        """
161
        for obj in self.get_preservations():
162
            info = self.get_base_info(obj)
163
            yield info
164
165
    def get_objects(self):
166
        """Returns a list of objects coming from the "uids" request parameter
167
        """
168
        # Create a mapping of source ARs for copy
169
        uids = self.request.form.get("uids", "")
170
        if not uids:
171
            # check for the `items` parammeter
172
            uids = self.request.form.get("items", "")
173
        if isinstance(uids, six.string_types):
174
            uids = uids.split(",")
175
        unique_uids = OrderedDict().fromkeys(uids).keys()
176
        return filter(None, map(self.get_object_by_uid, unique_uids))
177
178
    def get_sampletypes(self):
179
        """Returns the available SampleTypes of the system
180
        """
181
        query = {
182
            "portal_type": "SampleType",
183
            "sort_on": "sortable_title",
184
            "sort_order": "ascending",
185
            "is_active": True,
186
        }
187
        results = api.search(query, "senaite_catalog_setup")
188
        return map(api.get_object, results)
189
190
    def get_containers(self):
191
        """Returns the available Containers of the system
192
        """
193
        query = dict(portal_type="SampleContainer",
194
                     sort_on="sortable_title",
195
                     sort_order="ascending",
196
                     is_active=True)
197
        results = api.search(query, "senaite_catalog_setup")
198
        return map(api.get_object, results)
199
200
    def get_preservations(self):
201
        """Returns the available Preservations of the system
202
        """
203
        query = dict(portal_type="Preservation",
204
                     sort_on="sortable_title",
205
                     sort_order="ascending",
206
                     is_active=True)
207
        results = api.search(query, "senaite_catalog_setup")
208
        return map(api.get_object, results)
209
210
    @returns_super_model
211
    def to_super_model(self, obj_or_objs):
212
        """Returns a SuperModel for a given object or a list of Supermodels if
213
        a list of objects was passed in
214
        """
215
        return obj_or_objs
216
217
    def get_analysis_data_for(self, ar):
218
        """Return the Analysis data for this AR
219
        """
220
        # Exclude analyses from children (partitions)
221
        analyses = ar.objectValues("Analysis")
222
        out = []
223
        for an in analyses:
224
            info = self.get_base_info(an)
225
            info.update({
226
                "service_uid": an.getServiceUID(),
227
            })
228
            out.append(info)
229
        return out
230
231
    def get_template_data_for(self, ar):
232
        """Return the Template data for this AR
233
        """
234
        info = None
235
        template = ar.getTemplate()
236
        ar_sampletype_uid = api.get_uid(ar.getSampleType())
237
        ar_container_uid = ""
238
        if ar.getContainer():
239
            ar_container_uid = api.get_uid(ar.getContainer())
240
        ar_preservation_uid = ""
241
        if ar.getPreservation():
242
            ar_preservation_uid = api.get_uid(ar.getPreservation())
243
244
        if template:
245
            info = self.get_base_info(template)
246
247
            analyses = template.getAnalyses()
248
            partition_analyses = map(
249
                lambda x: (x.get("partition"), x.get("service_uid")), analyses)
250
251
            analyses_by_partition = defaultdict(list)
252
            for partition, service_uid in partition_analyses:
253
                analyses_by_partition[partition].append(service_uid)
254
255
            sampletypes_by_partition = defaultdict(list)
256
            containers_by_partition = defaultdict(list)
257
            preservations_by_partition = defaultdict(list)
258
            internal_use_by_partition = defaultdict(list)
259
            for part in template.getPartitions():
260
                part_id = part.get("part_id")
261
                sampletype_uid = part.get('sampletype_uid', ar_sampletype_uid)
262
                sampletypes_by_partition[part_id] = sampletype_uid
263
                container_uid = part.get("container_uid", ar_container_uid)
264
                containers_by_partition[part_id] = container_uid
265
                preserv_uid = part.get("preservation_uid", ar_preservation_uid)
266
                preservations_by_partition[part_id] = preserv_uid
267
                internal_use = part.get("internal_use", ar.getInternalUse())
268
                internal_use_by_partition[part_id] = internal_use
269
270
271
            partitions = map(lambda p: p.get("part_id"),
272
                             template.getPartitions())
273
            info.update({
274
                "analyses": analyses_by_partition,
275
                "partitions": partitions,
276
                "sample_types": sampletypes_by_partition,
277
                "containers": containers_by_partition,
278
                "preservations": preservations_by_partition,
279
                "internal_uses": internal_use_by_partition,
280
            })
281
        else:
282
            info = {
283
                "analyses": {},
284
                "partitions": [],
285
                "sample_types": {},
286
                "containers": {},
287
                "preservations": {},
288
                "internal_uses": {},
289
            }
290
        return info
291
292
    def get_number_of_partitions_for(self, ar):
293
        """Return the number of selected partitions
294
        """
295
        # fetch the number of partitions from the request
296
        uid = api.get_uid(ar)
297
        num = self.request.get("primary", {}).get(uid)
298
299
        if num is None:
300
            # get the number of partitions from the template
301
            template = ar.getTemplate()
302
            if template:
303
                num = len(template.getPartitions())
304
            else:
305
                num = DEFAULT_NUMBER_OF_PARTITIONS
306
        try:
307
            num = int(num)
308
        except (TypeError, ValueError):
309
            num = DEFAULT_NUMBER_OF_PARTITIONS
310
        return num
311
312
    def get_base_info(self, obj):
313
        """Extract the base info from the given object
314
        """
315
        obj = api.get_object(obj)
316
        review_state = api.get_workflow_status_of(obj)
317
        state_title = review_state.capitalize().replace("_", " ")
318
        return {
319
            "obj": obj,
320
            "id": api.get_id(obj),
321
            "uid": api.get_uid(obj),
322
            "title": api.get_title(obj),
323
            "path": api.get_path(obj),
324
            "url": api.get_url(obj),
325
            "review_state": review_state,
326
            "state_title": state_title,
327
        }
328
329
    def redirect(self, redirect_url=None, message=None, level="info"):
330
        """Redirect with a message
331
        """
332
        if redirect_url is None:
333
            redirect_url = self.back_url
334
        if message is not None:
335
            self.add_status_message(message, level)
336
        return self.request.response.redirect(redirect_url)
337
338
    def get_object_by_uid(self, uid):
339
        """Get the object by UID
340
        """
341
        logger.debug("get_object_by_uid::UID={}".format(uid))
342
        obj = api.get_object_by_uid(uid, None)
343
        if obj is None:
344
            logger.warn("!! No object found for UID #{} !!")
345
        return obj
346
347
    def add_status_message(self, message, level="info"):
348
        """Set a portal status message
349
        """
350
        return self.context.plone_utils.addPortalMessage(message, level)
351