Passed
Push — 2.x ( ca0980...375d4d )
by Ramon
08:08
created

SampleTemplate.setServices()   C

Complexity

Conditions 10

Size

Total Lines 56
Code Lines 38

Duplication

Lines 10
Ratio 17.86 %

Importance

Changes 0
Metric Value
eloc 38
dl 10
loc 56
rs 5.9999
c 0
b 0
f 0
cc 10
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like senaite.core.content.sampletemplate.SampleTemplate.setServices() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from AccessControl import ClassSecurityInfo
22
from bika.lims import api
23
from bika.lims import senaiteMessageFactory as _
24
from bika.lims.interfaces import IDeactivable
25
from plone.autoform import directives
26
from plone.supermodel import model
27
from Products.CMFCore import permissions
28
from senaite.core.catalog import SETUP_CATALOG
29
from senaite.core.config.widgets import get_default_columns
30
from senaite.core.content.base import Container
31
from senaite.core.content.mixins import ClientAwareMixin
32
from senaite.core.interfaces import ISampleTemplate
33
from senaite.core.schema import UIDReferenceField
34
from senaite.core.schema.fields import DataGridRow
35
from senaite.core.z3cform.widgets.datagrid import DataGridWidgetFactory
36
from senaite.core.z3cform.widgets.listing.widget import ListingWidgetFactory
37
from senaite.core.z3cform.widgets.uidreference import UIDReferenceWidgetFactory
38
from zope import schema
39
from zope.deprecation import deprecate
40
from zope.interface import Interface
41
from zope.interface import implementer
42
43
44
class IServiceRecord(Interface):
45
    """Record schema for selected services and partitions
46
    """
47
    uid = schema.TextLine(title=u"Service UID")
48
    hidden = schema.Bool(title=u"Hidden")
49
    part_id = schema.TextLine(title=u"Partition ID")
50
51
52
class IPartitionRecord(Interface):
53
    """DataGrid Row for Sample Partition Schema
54
    """
55
56
    # PARTITION
57
    directives.widget("part_id",
58
                      style=u"width:100px!important")
59
    part_id = schema.TextLine(
60
        title=_(
61
            u"label_sampletemplate_partition_part_id",
62
            default=u"Partition ID"
63
        ),
64
        required=False,
65
        default=u"part-1")
66
67
    # CONTAINER
68
    directives.widget(
69
        "container",
70
        UIDReferenceWidgetFactory,
71
        catalog=SETUP_CATALOG,
72
        query={
73
            "portal_type": "SampleContainer",
74
            "is_active": True,
75
            "sort_on": "sortable_title",
76
            "sort_order": "ascending",
77
        },
78
        display_template="<a href='${url}'>${Title}</a>",
79
        columns=get_default_columns)
80
    container = UIDReferenceField(
81
        title=_(
82
            u"label_sampletemplate_partition_container",
83
            default=u"Container"
84
        ),
85
        allowed_types=("SampleContainer", ),
86
        multi_valued=False,
87
        required=False)
88
89
    # PRESERVATION
90
    directives.widget(
91
        "preservation",
92
        UIDReferenceWidgetFactory,
93
        catalog=SETUP_CATALOG,
94
        query={
95
            "portal_type": "SamplePreservation",
96
            "is_active": True,
97
            "sort_on": "sortable_title",
98
            "sort_order": "ascending",
99
        },
100
        display_template="<a href='${url}'>${Title}</a>",
101
        columns=get_default_columns)
102
    preservation = UIDReferenceField(
103
        title=_(
104
            u"label_sampletemplate_partition_preservation",
105
            default=u"Preservation"
106
        ),
107
        allowed_types=("SamplePreservation", ),
108
        multi_valued=False,
109
        required=False)
110
111
    # SAMPLE TYPE
112
    directives.widget(
113
        "sampletype",
114
        UIDReferenceWidgetFactory,
115
        catalog=SETUP_CATALOG,
116
        query={
117
            "portal_type": "SampleType",
118
            "is_active": True,
119
            "sort_on": "sortable_title",
120
            "sort_order": "ascending",
121
        },
122
        display_template="<a href='${url}'>${Title}</a>",
123
        columns=get_default_columns)
124
    sampletype = UIDReferenceField(
125
        title=_(
126
            u"label_sampletemplate_partition_sampletype",
127
            default=u"Sample Type"
128
        ),
129
        allowed_types=("SampleType", ),
130
        multi_valued=False,
131
        required=False)
132
133
134
class ISampleTemplateSchema(model.Schema):
135
    """Schema interface
136
    """
137
138
    # Fieldset "Partitions"
139
    model.fieldset(
140
        "partitions",
141
        label=_(u"fieldset_sampletemplate_partitions",
142
                default=u"Partitions"),
143
        fields=[
144
            "partitions",
145
            "auto_partition",
146
        ]
147
    )
148
149
    # Fieldset "Analyses"
150
    model.fieldset(
151
        "analyses",
152
        label=_(u"fieldset_sampletemplate_analyses",
153
                default=u"Analyses"),
154
        fields=[
155
            "services",
156
        ]
157
    )
158
159
    # Title
160
    title = schema.TextLine(
161
        title=_(u"title_sampletemplate_title",
162
                default=u"Name"),
163
        required=True)
164
165
    # Description
166
    description = schema.Text(
167
        title=_(u"title_sampletemplate_description",
168
                default=u"Description"),
169
        required=False)
170
171
    # Sample Point
172
    directives.widget(
173
        "samplepoint",
174
        UIDReferenceWidgetFactory,
175
        catalog=SETUP_CATALOG,
176
        query={
177
            "portal_type": "SamplePoint",
178
            "is_active": True,
179
            "sort_on": "sortable_title",
180
            "sort_order": "ascending",
181
        },
182
        display_template="<a href='${url}'>${Title}</a>",
183
        columns=get_default_columns)
184
    samplepoint = UIDReferenceField(
185
        title=_(u"label_sampletemplate_samplepoint",
186
                default=u"Sample Point"),
187
        description=_(u"description_sampletemplate_samplepoint",
188
                      default=u"Select the sample point for this template"),
189
        allowed_types=("SamplePoint", ),
190
        multi_valued=False,
191
        required=False)
192
193
    # Sample Type
194
    directives.widget(
195
        "sampletype",
196
        UIDReferenceWidgetFactory,
197
        catalog=SETUP_CATALOG,
198
        query={
199
            "portal_type": "SampleType",
200
            "is_active": True,
201
            "sort_on": "sortable_title",
202
            "sort_order": "ascending",
203
        },
204
        display_template="<a href='${url}'>${Title}</a>",
205
        columns=get_default_columns)
206
    sampletype = UIDReferenceField(
207
        title=_(u"label_sampletemplate_sampletype",
208
                default=u"Sample Type"),
209
        description=_(u"description_sampletemplate_sampletype",
210
                      default=u"Select the sample type for this template"),
211
        allowed_types=("SampleType", ),
212
        multi_valued=False,
213
        required=False)
214
215
    # Composite
216
    composite = schema.Bool(
217
        title=_(u"title_sampletemplate_composite",
218
                default=u"Composite"),
219
        description=_(u"description_sampletemplate_composite",
220
                      default=u"Select if the sample is a mix of sub samples"),
221
        default=False,
222
        required=False)
223
224
    # Sampling Required
225
    sampling_required = schema.Bool(
226
        title=_(u"title_sampletemplate_sampling_required",
227
                default=u"Sample collected by the laboratory"),
228
        description=_(u"description_sampletemplate_sampling_required",
229
                      default=u"Enable sampling workflow for the created "
230
                              u"samples"),
231
        default=False,
232
        required=False)
233
234
    # Partitions (Data Grid)
235
    directives.widget(
236
        "partitions",
237
        DataGridWidgetFactory,
238
        allow_insert=False,  # only auto append
239
        allow_delete=True,
240
        allow_reorder=True,
241
        auto_append=True)
242
    partitions = schema.List(
243
        title=_(u"label_sampletemplate_partitions",
244
                default=u"Partitions"),
245
        description=_(u"description_sampletemplate_partitions",
246
                      default=u"Each line defines a new partition "
247
                      u"identified by the 'Partition ID'"),
248
        value_type=DataGridRow(
249
            title=u"Partition Scheme",
250
            schema=IPartitionRecord),
251
        required=True,
252
        default=[])
253
254
    # Auto Partition
255
    auto_partition = schema.Bool(
256
        title=_(u"title_sampletemplate_auto_partition",
257
                default=u"Auto-partition on sample reception"),
258
        description=_(u"description_sampletemplate_auto_partition",
259
                      default=u"Automatically redirect the user to the "
260
                              u"partitions view when the sample is received"),
261
        default=False,
262
        required=False)
263
264
    # Services
265
    directives.widget("services",
266
                      ListingWidgetFactory,
267
                      listing_view="sampletemplate_services_widget")
268
    services = schema.List(
269
        title=_(
270
            u"title_sampletemplate_services",
271
            default=u"Services"
272
        ),
273
        description=_(
274
            u"description_sampletemplate_services",
275
            default=u"Select the services for this template"
276
        ),
277
        value_type=DataGridRow(schema=IServiceRecord),
278
        default=[],
279
        required=True,
280
    )
281
282
283
@implementer(ISampleTemplate, ISampleTemplateSchema, IDeactivable)
284
class SampleTemplate(Container, ClientAwareMixin):
285
    """SampleTemplate
286
    """
287
    # Catalogs where this type will be catalogued
288
    _catalogs = [SETUP_CATALOG]
289
290
    security = ClassSecurityInfo()
291
292
    @security.protected(permissions.View)
293
    def getRawSamplePoint(self):
294
        accessor = self.accessor("samplepoint", raw=True)
295
        return accessor(self)
296
297
    @security.protected(permissions.View)
298
    def getSamplePoint(self):
299
        samplepoint = self.getRawSamplePoint()
300
        if not samplepoint:
301
            return None
302
        return api.get_object(samplepoint)
303
304
    @security.protected(permissions.ModifyPortalContent)
305
    def setSamplePoint(self, value):
306
        mutator = self.mutator("samplepoint")
307
        mutator(self, value)
308
309
    @deprecate("deprecated since SENAITE 2.6: Use getRawSamplePoint() instead")
310
    @security.protected(permissions.View)
311
    def getSamplePointUID(self):
312
        return self.getRawSamplePoint()
313
314
    # BBB: AT schema field property
315
    SamplePoint = property(getSamplePoint, setSamplePoint)
316
    SamplePointUID = property(getRawSamplePoint)
317
318
    @security.protected(permissions.View)
319
    def getRawSampleType(self):
320
        accessor = self.accessor("sampletype", raw=True)
321
        return accessor(self)
322
323
    @security.protected(permissions.View)
324
    def getSampleType(self):
325
        sampletype = self.getRawSampleType()
326
        if not sampletype:
327
            return None
328
        return api.get_object(sampletype)
329
330
    @security.protected(permissions.ModifyPortalContent)
331
    def setSampleType(self, value):
332
        mutator = self.mutator("sampletype")
333
        mutator(self, value)
334
335
    # BBB: AT schema field property
336
    SampleType = property(getSampleType, setSampleType)
337
338
    @security.protected(permissions.View)
339
    def getComposite(self):
340
        accessor = self.accessor("composite")
341
        return bool(accessor(self))
342
343
    @security.protected(permissions.ModifyPortalContent)
344
    def setComposite(self, value):
345
        mutator = self.mutator("composite")
346
        mutator(self, bool(value))
347
348
    # BBB: AT schema field property
349
    Composite = property(getComposite, setComposite)
350
351
    @security.protected(permissions.View)
352
    def getSamplingRequired(self):
353
        accessor = self.accessor("sampling_required")
354
        return bool(accessor(self))
355
356
    @security.protected(permissions.ModifyPortalContent)
357
    def setSamplingRequired(self, value):
358
        mutator = self.mutator("sampling_required")
359
        mutator(self, bool(value))
360
361
    @security.protected(permissions.View)
362
    def getSamplingRequiredDefaultValue(self):
363
        """Get the setup if the sampling workflow is enabled or not
364
        """
365
        bikasetup = api.get_bika_setup()
366
        return bikasetup.getSamplingWorkflowEnabled()
367
368
    # BBB: AT schema field property
369
    SamplingRequired = property(getSamplingRequired, setSamplingRequired)
370
371
    @security.protected(permissions.View)
372
    def getPartitions(self):
373
        accessor = self.accessor("partitions")
374
        records = accessor(self) or []
375
376
        # UIDReference sub-fields are stored as a list!
377
        # See https://github.com/senaite/senaite.core/pull/2630
378
        def first(record, subfield):
379
            items = record.get(subfield)
380
            return items[0] if items else ""
381
382
        partitions = []
383
        for record in records:
384
            partition = record.copy()
385
            partition.update({
386
                "sampletype": first(record, "sampletype"),
387
                "container": first(record, "container"),
388
                "preservation": first(record, "preservation"),
389
            })
390
            partitions.append(partition)
391
392
        return partitions
393
394
    @security.protected(permissions.ModifyPortalContent)
395
    def setPartitions(self, value):
396
        """Set partitions for the template
397
        """
398
        if not isinstance(value, (list, dict)):
399
            raise TypeError(
400
                "Expected a dict or list, got %r" % type(value))
401
        if isinstance(value, dict):
402
            value = [value]
403
        records = []
404
        for v in value:
405
            part_id = v.get("part_id", "")
406
            container = v.get("container", "")
407
            preservation = v.get("preservation", "")
408
            sampletype = v.get("sampletype", "")
409
410
            # ensure UIDs for reference fields
411
            if container:
412
                container = api.get_uid(container)
413
            if preservation:
414
                preservation = api.get_uid(preservation)
415
            if sampletype:
416
                sampletype = api.get_uid(sampletype)
417
418
            records.append({
419
                "part_id": part_id,
420
                "container": container,
421
                "preservation": preservation,
422
                "sampletype": sampletype,
423
            })
424
        mutator = self.mutator("partitions")
425
        mutator(self, records)
426
427
    # BBB: AT schema field property
428
    Partitions = property(getPartitions, setPartitions)
429
430
    @security.protected(permissions.View)
431
    def getAutoPartition(self):
432
        accessor = self.accessor("auto_partition")
433
        return bool(accessor(self))
434
435
    @security.protected(permissions.ModifyPortalContent)
436
    def setAutoPartition(self, value):
437
        mutator = self.mutator("auto_partition")
438
        mutator(self, bool(value))
439
440
    # BBB: AT schema field property
441
    AutoPartition = property(getAutoPartition, setAutoPartition)
442
443
    @security.protected(permissions.View)
444
    def getRawServices(self):
445
        """Return the raw value of the services field
446
447
        >>> self.getRawServices()
448
        [{'uid': '...', 'part_id': 'part-1', 'hidden': False}, ...]
449
450
        :returns: List of dicts including `uid`, `hidden` and `part_id`
451
        """
452
        accessor = self.accessor("services")
453
        return accessor(self) or []
454
455
    @security.protected(permissions.View)
456
    def getServices(self, active_only=True):
457
        """Returns a list of service objects
458
459
        >>> self.getServices()
460
        [<AnalysisService at ...>,  <AnalysisService at ...>, ...]
461
462
        :returns: List of analysis service objects
463
        """
464
        services = map(api.get_object, self.getRawServiceUIDs())
465
        if active_only:
466
            # filter out inactive services
467
            services = filter(api.is_active, services)
468
        return list(services)
469
470
    @security.protected(permissions.ModifyPortalContent)
471
    def setServices(self, value, keep_inactive=True):
472
        """Set services for the template
473
474
        This method accepts either a list of analysis service objects, a list
475
        of analysis service UIDs or a list of analysis profile service records
476
        containing the keys `uid`, `hidden` and `part_id`:
477
478
        >>> self.setServices([<AnalysisService at ...>, ...])
479
        >>> self.setServices(['353e1d9bd45d45dbabc837114a9c41e6', '...', ...])
480
        >>> self.setServices([{'hidden': False, 'uid': '...'}, ...])
481
482
        Raises a TypeError if the value does not match any allowed type.
483
        """
484
        if not isinstance(value, list):
485
            value = [value]
486
487
        records = []
488
        for v in value:
489
            uid = ""
490
            hidden = False
491
            part_id = ""
492
            if isinstance(v, dict):
493
                uid = api.get_uid(v.get("uid"))
494
                hidden = v.get("hidden",
495
                               api.get_object(uid).getHidden())
496
                part_id = v.get("part_id", "")
497
            elif api.is_object(v):
498
                uid = api.get_uid(v)
499
                hidden = v.getHidden()
500
            elif api.is_uid(v):
501
                obj = api.get_object(v)
502
                uid = v
503
                hidden = obj.getHidden()
504
            else:
505
                raise TypeError(
506
                    "Expected object, uid or record, got %r" % type(v))
507
            records.append({
508
                "uid": uid,
509
                "hidden": hidden,
510
                "part_id": part_id,
511
            })
512
513 View Code Duplication
        if keep_inactive:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
514
            # keep inactive services so they come up again when reactivated
515
            uids = [record.get("uid") for record in records]
516
            for record in self.getRawServices():
517
                uid = record.get("uid")
518
                if uid in uids:
519
                    continue
520
                obj = api.get_object(uid)
521
                if not api.is_active(obj):
522
                    records.append(record)
523
524
        mutator = self.mutator("services")
525
        mutator(self, records)
526
527
    # BBB: AT schema field property
528
    Services = property(getServices, setServices)
529
530
    @security.protected(permissions.View)
531
    def getServiceUIDs(self, active_only=True):
532
        """Returns a list of UIDs for the referenced AnalysisService objects
533
534
        :param active_only: If True, only UIDs of active services are returned
535
        :returns: A list of unique identifiers (UIDs)
536
        """
537
        if active_only:
538
            services = self.getServices(active_only=active_only)
539
            return list(map(api.get_uid, services))
540
        return self.getRawServiceUIDs()
541
542
    @security.protected(permissions.View)
543
    def getRawServiceUIDs(self):
544
        """Returns the list of UIDs stored as raw data in the 'Services' field
545
546
        :returns: A list of UIDs extracted from the raw 'Services' data.
547
        """
548
        services = self.getRawServices()
549
        return list(map(lambda record: record.get("uid"), services))
550
551
    @deprecate("deprecated since SENAITE 2.6: Use getRawServices() instead")
552
    @security.protected(permissions.View)
553
    def getAnalysisServicesSettings(self):
554
        """BBB: Return the settings for all assigned services
555
556
        :returns: List of dicts including `uid`, `hidden` and `part_id`
557
        """
558
        # Note: We store the selected service UIDs, hidden and part_id
559
        # settings in the `services` field. Therefore, we can just return the
560
        # raw value.
561
        return self.getRawServices()
562
563
    @security.protected(permissions.ModifyPortalContent)
564
    def setAnalysisServicesSettings(self, settings):
565
        """BBB: Update settings for all assigned service UIDs
566
567
        This method expects a list of dictionaries containing the service
568
        `uid`, `part_id` and the `hidden` setting.
569
570
        This is basically the same format as stored in the `services` field!
571
572
        However, we want to just update the settings for selected service UIDs"
573
574
        >>> settings =  [{'uid': '...', 'hidden': False, 'part_id': 'part-1'}]
575
        >>> setAnalysisServicesSettings(settings)
576
        """
577
        if not isinstance(settings, list):
578
            settings = [settings]
579
580
        by_uid = self.get_services_by_uid()
581
582
        for setting in settings:
583
            if not isinstance(setting, dict):
584
                raise TypeError(
585
                    "Expected a record containing `uid`, `hidden` and"
586
                    "`partition`, got %s" % type(setting))
587
            uid = api.get_uid(setting.get("uid"))
588
            hidden = setting.get("hidden", api.get_object(uid).getHidden())
589
            part_id = setting.get("part_id", "")
590
591
            if not uid:
592
                raise ValueError("UID is missing in setting %r" % setting)
593
594
            record = by_uid.get(api.get_uid(uid))
595
            if not record:
596
                continue
597
            record["hidden"] = hidden
598
            record["part_id"] = part_id
599
600
        # set back the new services
601
        self.setServices(by_uid.values())
602
603
    @security.protected(permissions.View)
604
    def getAnalysisServiceSettings(self, service):
605
        """Returns the settings for a single service
606
        """
607
        uid = api.get_uid(service)
608
        by_uid = self.get_services_by_uid()
609
        record = by_uid.get(uid, {
610
            "uid": uid,
611
            "part_id": "",
612
            "hidden": False,
613
        })
614
        return record
615
616
    @security.protected(permissions.View)
617
    def isAnalysisServiceHidden(self, service):
618
        """Check if the service is configured as hidden
619
        """
620
        uid = api.get_uid(service)
621
        services = self.get_services_by_uid()
622
        record = services.get(uid)
623
        if not record:
624
            obj = api.get_object(service)
625
            return obj.getRawHidden()
626
        return record.get("hidden", False)
627
628
    @security.protected(permissions.View)
629
    def getAnalysisServicePartitionID(self, service):
630
        """Get the assigned parition for the service
631
        """
632
        uid = api.get_uid(service)
633
        services = self.get_services_by_uid()
634
        record = services.get(uid)
635
        if not record:
636
            return ""
637
        return record.get("part_id", "")
638
639
    @deprecate("deprecated since SENAITE 2.6: Use getServiceUIDs() instead")
640
    @security.protected(permissions.View)
641
    def getAnalysisServiceUIDs(self):
642
        """Returns a list of all assigned service UIDs
643
        """
644
        return self.getServiceUIDs()
645
646
    @security.protected(permissions.View)
647
    def get_services_by_uid(self):
648
        """Return the selected services grouped by UID
649
        """
650
        records = {}
651
        for record in self.services:
652
            records[record.get("uid")] = record
653
        return records
654
655 View Code Duplication
    def remove_service(self, service):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
656
        """Remove the service from the template
657
658
        If the service is not selected in the profile, returns False.
659
660
        NOTE: This method is used when an Analysis Service was deactivated.
661
662
        :param service: The service to be removed from this template
663
        :type service: AnalysisService
664
        :return: True if the AnalysisService has been removed successfully
665
        """
666
        # get the UID of the service that should be removed
667
        uid = api.get_uid(service)
668
        # get the current raw value of the services field.
669
        current_services = self.getRawServices()
670
        # filter out the UID of the service
671
        new_services = filter(
672
            lambda record: record.get("uid") != uid, current_services)
673
674
        # check if the service was removed or not
675
        current_services_count = len(current_services)
676
        new_services_count = len(new_services)
677
678
        if current_services_count == new_services_count:
679
            # service was not part of the profile
680
            return False
681
682
        # set the new services
683
        self.setServices(new_services)
684
685
        return True
686