Passed
Push — 2.x ( 0ba30f...7ab1ba )
by Jordi
08:49
created

SampleTemplate.setPartitions()   B

Complexity

Conditions 7

Size

Total Lines 32
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 32
rs 7.856
c 0
b 0
f 0
cc 7
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from AccessControl import ClassSecurityInfo
22
from bika.lims import api
23
from bika.lims import senaiteMessageFactory as _
24
from bika.lims.interfaces import IDeactivable
25
from plone.autoform import directives
26
from plone.supermodel import model
27
from Products.CMFCore import permissions
28
from senaite.core.catalog import SETUP_CATALOG
29
from senaite.core.config.widgets import get_default_columns
30
from senaite.core.content.base import Container
31
from senaite.core.content.mixins import ClientAwareMixin
32
from senaite.core.interfaces import ISampleTemplate
33
from senaite.core.schema import UIDReferenceField
34
from senaite.core.schema.fields import DataGridRow
35
from senaite.core.z3cform.widgets.datagrid import DataGridWidgetFactory
36
from senaite.core.z3cform.widgets.listing.widget import ListingWidgetFactory
37
from senaite.core.z3cform.widgets.uidreference import UIDReferenceWidgetFactory
38
from zope import schema
39
from zope.deprecation import deprecate
40
from zope.interface import Interface
41
from zope.interface import implementer
42
43
44
class IServiceRecord(Interface):
45
    """Record schema for selected services and partitions
46
    """
47
    uid = schema.TextLine(title=u"Service UID")
48
    hidden = schema.Bool(title=u"Hidden")
49
    part_id = schema.TextLine(title=u"Partition ID")
50
51
52
class IPartitionRecord(Interface):
53
    """DataGrid Row for Sample Partition Schema
54
    """
55
56
    # PARTITION
57
    directives.widget("part_id",
58
                      style=u"width:100px!important")
59
    part_id = schema.TextLine(
60
        title=_(
61
            u"label_sampletemplate_partition_part_id",
62
            default=u"Partition ID"
63
        ),
64
        required=False,
65
        default=u"part-1")
66
67
    # CONTAINER
68
    directives.widget(
69
        "container",
70
        UIDReferenceWidgetFactory,
71
        catalog=SETUP_CATALOG,
72
        query={
73
            "portal_type": "SampleContainer",
74
            "is_active": True,
75
            "sort_on": "sortable_title",
76
            "sort_order": "ascending",
77
        },
78
        display_template="<a href='${url}'>${Title}</a>",
79
        columns=get_default_columns)
80
    container = UIDReferenceField(
81
        title=_(
82
            u"label_sampletemplate_partition_container",
83
            default=u"Container"
84
        ),
85
        allowed_types=("SampleContainer", ),
86
        multi_valued=False,
87
        required=False)
88
89
    # PRESERVATION
90
    directives.widget(
91
        "preservation",
92
        UIDReferenceWidgetFactory,
93
        catalog=SETUP_CATALOG,
94
        query={
95
            "portal_type": "SamplePreservation",
96
            "is_active": True,
97
            "sort_on": "sortable_title",
98
            "sort_order": "ascending",
99
        },
100
        display_template="<a href='${url}'>${Title}</a>",
101
        columns=get_default_columns)
102
    preservation = UIDReferenceField(
103
        title=_(
104
            u"label_sampletemplate_partition_preservation",
105
            default=u"Preservation"
106
        ),
107
        allowed_types=("SamplePreservation", ),
108
        multi_valued=False,
109
        required=False)
110
111
    # SAMPLE TYPE
112
    directives.widget(
113
        "sampletype",
114
        UIDReferenceWidgetFactory,
115
        catalog=SETUP_CATALOG,
116
        query={
117
            "portal_type": "SampleType",
118
            "is_active": True,
119
            "sort_on": "sortable_title",
120
            "sort_order": "ascending",
121
        },
122
        display_template="<a href='${url}'>${Title}</a>",
123
        columns=get_default_columns)
124
    sampletype = UIDReferenceField(
125
        title=_(
126
            u"label_sampletemplate_partition_sampletype",
127
            default=u"Sample Type"
128
        ),
129
        allowed_types=("SampleType", ),
130
        multi_valued=False,
131
        required=False)
132
133
134
class ISampleTemplateSchema(model.Schema):
135
    """Schema interface
136
    """
137
138
    # Fieldset "Partitions"
139
    model.fieldset(
140
        "partitions",
141
        label=_(u"fieldset_sampletemplate_partitions",
142
                default=u"Partitions"),
143
        fields=[
144
            "partitions",
145
            "auto_partition",
146
        ]
147
    )
148
149
    # Fieldset "Analyses"
150
    model.fieldset(
151
        "analyses",
152
        label=_(u"fieldset_sampletemplate_analyses",
153
                default=u"Analyses"),
154
        fields=[
155
            "services",
156
        ]
157
    )
158
159
    # Title
160
    title = schema.TextLine(
161
        title=_(u"title_sampletemplate_title",
162
                default=u"Name"),
163
        required=True)
164
165
    # Description
166
    description = schema.Text(
167
        title=_(u"title_sampletemplate_description",
168
                default=u"Description"),
169
        required=False)
170
171
    # Sample Point
172
    directives.widget(
173
        "samplepoint",
174
        UIDReferenceWidgetFactory,
175
        catalog=SETUP_CATALOG,
176
        query={
177
            "portal_type": "SamplePoint",
178
            "is_active": True,
179
            "sort_on": "sortable_title",
180
            "sort_order": "ascending",
181
        },
182
        display_template="<a href='${url}'>${Title}</a>",
183
        columns=get_default_columns)
184
    samplepoint = UIDReferenceField(
185
        title=_(u"label_sampletemplate_samplepoint",
186
                default=u"Sample Point"),
187
        description=_(u"description_sampletemplate_samplepoint",
188
                      default=u"Select the sample point for this template"),
189
        allowed_types=("SamplePoint", ),
190
        multi_valued=False,
191
        required=False)
192
193
    # Sample Type
194
    directives.widget(
195
        "sampletype",
196
        UIDReferenceWidgetFactory,
197
        catalog=SETUP_CATALOG,
198
        query={
199
            "portal_type": "SampleType",
200
            "is_active": True,
201
            "sort_on": "sortable_title",
202
            "sort_order": "ascending",
203
        },
204
        display_template="<a href='${url}'>${Title}</a>",
205
        columns=get_default_columns)
206
    sampletype = UIDReferenceField(
207
        title=_(u"label_sampletemplate_sampletype",
208
                default=u"Sample Type"),
209
        description=_(u"description_sampletemplate_sampletype",
210
                      default=u"Select the sample type for this template"),
211
        allowed_types=("SampleType", ),
212
        multi_valued=False,
213
        required=False)
214
215
    # Composite
216
    composite = schema.Bool(
217
        title=_(u"title_sampletemplate_composite",
218
                default=u"Composite"),
219
        description=_(u"description_sampletemplate_composite",
220
                      default=u"Select if the sample is a mix of sub samples"),
221
        default=False,
222
        required=False)
223
224
    # Sampling Required
225
    sampling_required = schema.Bool(
226
        title=_(u"title_sampletemplate_sampling_required",
227
                default=u"Sample collected by the laboratory"),
228
        description=_(u"description_sampletemplate_sampling_required",
229
                      default=u"Enable sampling workflow for the created "
230
                              u"samples"),
231
        default=False,
232
        required=False)
233
234
    # Partitions (Data Grid)
235
    directives.widget(
236
        "partitions",
237
        DataGridWidgetFactory,
238
        allow_insert=False,  # only auto append
239
        allow_delete=True,
240
        allow_reorder=True,
241
        auto_append=True)
242
    partitions = schema.List(
243
        title=_(u"label_sampletemplate_partitions",
244
                default=u"Partitions"),
245
        description=_(u"description_sampletemplate_partitions",
246
                      default=u"Each line defines a new partition "
247
                      u"identified by the 'Partition ID'"),
248
        value_type=DataGridRow(
249
            title=u"Partition Scheme",
250
            schema=IPartitionRecord),
251
        required=True,
252
        default=[])
253
254
    # Auto Partition
255
    auto_partition = schema.Bool(
256
        title=_(u"title_sampletemplate_auto_partition",
257
                default=u"Auto-partition on sample reception"),
258
        description=_(u"description_sampletemplate_auto_partition",
259
                      default=u"Automatically redirect the user to the "
260
                              u"partitions view when the sample is received"),
261
        default=False,
262
        required=False)
263
264
    # Services
265
    directives.widget("services",
266
                      ListingWidgetFactory,
267
                      listing_view="sampletemplate_services_widget")
268
    services = schema.List(
269
        title=_(
270
            u"title_sampletemplate_services",
271
            default=u"Services"
272
        ),
273
        description=_(
274
            u"description_sampletemplate_services",
275
            default=u"Select the services for this template"
276
        ),
277
        value_type=DataGridRow(schema=IServiceRecord),
278
        default=[],
279
        required=True,
280
    )
281
282
283
@implementer(ISampleTemplate, ISampleTemplateSchema, IDeactivable)
284
class SampleTemplate(Container, ClientAwareMixin):
285
    """SampleTemplate
286
    """
287
    # Catalogs where this type will be catalogued
288
    _catalogs = [SETUP_CATALOG]
289
290
    security = ClassSecurityInfo()
291
292
    @security.protected(permissions.View)
293
    def getRawSamplePoint(self):
294
        accessor = self.accessor("samplepoint", raw=True)
295
        return accessor(self)
296
297
    @security.protected(permissions.View)
298
    def getSamplePoint(self):
299
        samplepoint = self.getRawSamplePoint()
300
        if not samplepoint:
301
            return None
302
        return api.get_object(samplepoint)
303
304
    @security.protected(permissions.ModifyPortalContent)
305
    def setSamplePoint(self, value):
306
        mutator = self.mutator("samplepoint")
307
        mutator(self, value)
308
309
    @deprecate("deprecated since SENAITE 2.6: Use getRawSamplePoint() instead")
310
    @security.protected(permissions.View)
311
    def getSamplePointUID(self):
312
        return self.getRawSamplePoint()
313
314
    # BBB: AT schema field property
315
    SamplePoint = property(getSamplePoint, setSamplePoint)
316
    SamplePointUID = property(getRawSamplePoint)
317
318
    @security.protected(permissions.View)
319
    def getRawSampleType(self):
320
        accessor = self.accessor("sampletype", raw=True)
321
        return accessor(self)
322
323
    @security.protected(permissions.View)
324
    def getSampleType(self):
325
        sampletype = self.getRawSampleType()
326
        if not sampletype:
327
            return None
328
        return api.get_object(sampletype)
329
330
    @security.protected(permissions.ModifyPortalContent)
331
    def setSampleType(self, value):
332
        mutator = self.mutator("sampletype")
333
        mutator(self, value)
334
335
    # BBB: AT schema field property
336
    SampleType = property(getSampleType, setSampleType)
337
338
    @security.protected(permissions.View)
339
    def getComposite(self):
340
        accessor = self.accessor("composite")
341
        return bool(accessor(self))
342
343
    @security.protected(permissions.ModifyPortalContent)
344
    def setComposite(self, value):
345
        mutator = self.mutator("composite")
346
        mutator(self, bool(value))
347
348
    # BBB: AT schema field property
349
    Composite = property(getComposite, setComposite)
350
351
    @security.protected(permissions.View)
352
    def getSamplingRequired(self):
353
        accessor = self.accessor("sampling_required")
354
        return bool(accessor(self))
355
356
    @security.protected(permissions.ModifyPortalContent)
357
    def setSamplingRequired(self, value):
358
        mutator = self.mutator("sampling_required")
359
        mutator(self, bool(value))
360
361
    @security.protected(permissions.View)
362
    def getSamplingRequiredDefaultValue(self):
363
        """Get the setup if the sampling workflow is enabled or not
364
        """
365
        bikasetup = api.get_bika_setup()
366
        return bikasetup.getSamplingWorkflowEnabled()
367
368
    # BBB: AT schema field property
369
    SamplingRequired = property(getSamplingRequired, setSamplingRequired)
370
371
    @security.protected(permissions.View)
372
    def getPartitions(self):
373
        accessor = self.accessor("partitions")
374
        return accessor(self) or []
375
376
    @security.protected(permissions.ModifyPortalContent)
377
    def setPartitions(self, value):
378
        """Set partitions for the template
379
        """
380
        if not isinstance(value, (list, dict)):
381
            raise TypeError(
382
                "Expected a dict or list, got %r" % type(value))
383
        if isinstance(value, dict):
384
            value = [value]
385
        records = []
386
        for v in value:
387
            part_id = v.get("part_id", "")
388
            container = v.get("container", "")
389
            preservation = v.get("preservation", "")
390
            sampletype = v.get("sampletype", "")
391
392
            # ensure UIDs for reference fields
393
            if container:
394
                container = api.get_uid(container)
395
            if preservation:
396
                preservation = api.get_uid(preservation)
397
            if sampletype:
398
                sampletype = api.get_uid(sampletype)
399
400
            records.append({
401
                "part_id": part_id,
402
                "container": container,
403
                "preservation": preservation,
404
                "sampletype": sampletype,
405
            })
406
        mutator = self.mutator("partitions")
407
        mutator(self, records)
408
409
    # BBB: AT schema field property
410
    Partitions = property(getPartitions, setPartitions)
411
412
    @security.protected(permissions.View)
413
    def getAutoPartition(self):
414
        accessor = self.accessor("auto_partition")
415
        return bool(accessor(self))
416
417
    @security.protected(permissions.ModifyPortalContent)
418
    def setAutoPartition(self, value):
419
        mutator = self.mutator("auto_partition")
420
        mutator(self, bool(value))
421
422
    # BBB: AT schema field property
423
    AutoPartition = property(getAutoPartition, setAutoPartition)
424
425
    @security.protected(permissions.View)
426
    def getRawServices(self):
427
        """Return the raw value of the services field
428
429
        >>> self.getRawServices()
430
        [{'uid': '...', 'part_id': 'part-1', 'hidden': False}, ...]
431
432
        :returns: List of dicts including `uid`, `hidden` and `part_id`
433
        """
434
        accessor = self.accessor("services")
435
        return accessor(self) or []
436
437
    @security.protected(permissions.View)
438
    def getServices(self):
439
        """Returns a list of service objects
440
441
        >>> self.getServices()
442
        [<AnalysisService at ...>,  <AnalysisService at ...>, ...]
443
444
        :returns: List of analysis service objects
445
        """
446
        records = self.getRawServices()
447
        service_uids = map(lambda r: r.get("uid"), records)
448
        return list(map(api.get_object, service_uids))
449
450
    @security.protected(permissions.ModifyPortalContent)
451
    def setServices(self, value):
452
        """Set services for the template
453
454
        This method accepts either a list of analysis service objects, a list
455
        of analysis service UIDs or a list of analysis profile service records
456
        containing the keys `uid`, `hidden` and `part_id`:
457
458
        >>> self.setServices([<AnalysisService at ...>, ...])
459
        >>> self.setServices(['353e1d9bd45d45dbabc837114a9c41e6', '...', ...])
460
        >>> self.setServices([{'hidden': False, 'uid': '...'}, ...])
461
462
        Raises a TypeError if the value does not match any allowed type.
463
        """
464
        if not isinstance(value, list):
465
            value = [value]
466
467
        records = []
468
        for v in value:
469
            uid = ""
470
            hidden = False
471
            part_id = ""
472
            if isinstance(v, dict):
473
                uid = api.get_uid(v.get("uid"))
474
                hidden = v.get("hidden",
475
                               api.get_object(uid).getHidden())
476
                part_id = v.get("part_id", "")
477
            elif api.is_object(v):
478
                uid = api.get_uid(v)
479
                hidden = v.getHidden()
480
            elif api.is_uid(v):
481
                obj = api.get_object(v)
482
                uid = v
483
                hidden = obj.getHidden()
484
            else:
485
                raise TypeError(
486
                    "Expected object, uid or record, got %r" % type(v))
487
            records.append({
488
                "uid": uid,
489
                "hidden": hidden,
490
                "part_id": part_id,
491
            })
492
493
        mutator = self.mutator("services")
494
        mutator(self, records)
495
496
    # BBB: AT schema field property
497
    Services = property(getServices, setServices)
498
499
    @deprecate("deprecated since SENAITE 2.6: Use getRawServices() instead")
500
    @security.protected(permissions.View)
501
    def getAnalysisServicesSettings(self):
502
        """BBB: Return the settings for all assigned services
503
504
        :returns: List of dicts including `uid`, `hidden` and `part_id`
505
        """
506
        # Note: We store the selected service UIDs, hidden and part_id
507
        # settings in the `services` field. Therefore, we can just return the
508
        # raw value.
509
        return self.getRawServices()
510
511
    @security.protected(permissions.ModifyPortalContent)
512
    def setAnalysisServicesSettings(self, settings):
513
        """BBB: Update settings for all assigned service UIDs
514
515
        This method expects a list of dictionaries containing the service
516
        `uid`, `part_id` and the `hidden` setting.
517
518
        This is basically the same format as stored in the `services` field!
519
520
        However, we want to just update the settings for selected service UIDs"
521
522
        >>> settings =  [{'uid': '...', 'hidden': False, 'part_id': 'part-1'}]
523
        >>> setAnalysisServicesSettings(settings)
524
        """
525
        if not isinstance(settings, list):
526
            settings = [settings]
527
528
        by_uid = self.get_services_by_uid()
529
530
        for setting in settings:
531
            if not isinstance(setting, dict):
532
                raise TypeError(
533
                    "Expected a record containing `uid`, `hidden` and"
534
                    "`partition`, got %s" % type(setting))
535
            uid = api.get_uid(setting.get("uid"))
536
            hidden = setting.get("hidden", api.get_object(uid).getHidden())
537
            part_id = setting.get("part_id", "")
538
539
            if not uid:
540
                raise ValueError("UID is missing in setting %r" % setting)
541
542
            record = by_uid.get(api.get_uid(uid))
543
            if not record:
544
                continue
545
            record["hidden"] = hidden
546
            record["part_id"] = part_id
547
548
        # set back the new services
549
        self.setServices(by_uid.values())
550
551
    @security.protected(permissions.View)
552
    def getAnalysisServiceSettings(self, service):
553
        """Returns the settings for a single service
554
        """
555
        uid = api.get_uid(service)
556
        by_uid = self.get_services_by_uid()
557
        record = by_uid.get(uid, {
558
            "uid": uid,
559
            "part_id": "",
560
            "hidden": False,
561
        })
562
        return record
563
564
    @security.protected(permissions.View)
565
    def isAnalysisServiceHidden(self, service):
566
        """Check if the service is configured as hidden
567
        """
568
        uid = api.get_uid(service)
569
        services = self.get_services_by_uid()
570
        record = services.get(uid)
571
        if not record:
572
            obj = api.get_object(service)
573
            return obj.getRawHidden()
574
        return record.get("hidden", False)
575
576
    @security.protected(permissions.View)
577
    def getAnalysisServicePartitionID(self, service):
578
        """Get the assigned parition for the service
579
        """
580
        uid = api.get_uid(service)
581
        services = self.get_services_by_uid()
582
        record = services.get(uid)
583
        if not record:
584
            return ""
585
        return record.get("part_id", "")
586
587
    @security.protected(permissions.View)
588
    def getAnalysisServiceUIDs(self):
589
        """Returns a list of all assigned service UIDs
590
        """
591
        services = self.getRawServices()
592
        return list(map(lambda record: record.get("uid"), services))
593
594
    @security.protected(permissions.View)
595
    def get_services_by_uid(self):
596
        """Return the selected services grouped by UID
597
        """
598
        records = {}
599
        for record in self.services:
600
            records[record.get("uid")] = record
601
        return records
602
603 View Code Duplication
    def remove_service(self, service):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
604
        """Remove the service from the template
605
606
        If the service is not selected in the profile, returns False.
607
608
        NOTE: This method is used when an Analysis Service was deactivated.
609
610
        :param service: The service to be removed from this template
611
        :type service: AnalysisService
612
        :return: True if the AnalysisService has been removed successfully
613
        """
614
        # get the UID of the service that should be removed
615
        uid = api.get_uid(service)
616
        # get the current raw value of the services field.
617
        current_services = self.getRawServices()
618
        # filter out the UID of the service
619
        new_services = filter(
620
            lambda record: record.get("uid") != uid, current_services)
621
622
        # check if the service was removed or not
623
        current_services_count = len(current_services)
624
        new_services_count = len(new_services)
625
626
        if current_services_count == new_services_count:
627
            # service was not part of the profile
628
            return False
629
630
        # set the new services
631
        self.setServices(new_services)
632
633
        return True
634