Passed
Push — 2.x ( 2d92fb...6121d6 )
by Ramon
07:19
created

SampleTemplate.getSamplePointUID()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from AccessControl import ClassSecurityInfo
22
from bika.lims import api
23
from bika.lims import senaiteMessageFactory as _
24
from bika.lims.interfaces import IDeactivable
25
from plone.autoform import directives
26
from plone.supermodel import model
27
from Products.CMFCore import permissions
28
from senaite.core.catalog import SETUP_CATALOG
29
from senaite.core.config.widgets import get_default_columns
30
from senaite.core.content.base import Container
31
from senaite.core.content.mixins import ClientAwareMixin
32
from senaite.core.interfaces import ISampleTemplate
33
from senaite.core.schema import UIDReferenceField
34
from senaite.core.schema.fields import DataGridRow
35
from senaite.core.z3cform.widgets.datagrid import DataGridWidgetFactory
36
from senaite.core.z3cform.widgets.listing.widget import ListingWidgetFactory
37
from senaite.core.z3cform.widgets.uidreference import UIDReferenceWidgetFactory
38
from zope import schema
39
from zope.deprecation import deprecate
40
from zope.interface import Interface
41
from zope.interface import implementer
42
43
44
class IServiceRecord(Interface):
45
    """Record schema for selected services and partitions
46
    """
47
    uid = schema.TextLine(title=u"Service UID")
48
    hidden = schema.Bool(title=u"Hidden")
49
    part_id = schema.TextLine(title=u"Partition ID")
50
51
52
class IPartitionRecord(Interface):
53
    """DataGrid Row for Sample Partition Schema
54
    """
55
56
    # PARTITION
57
    directives.widget("part_id",
58
                      style=u"width:100px!important")
59
    part_id = schema.TextLine(
60
        title=_(
61
            u"label_sampletemplate_partition_part_id",
62
            default=u"Partition ID"
63
        ),
64
        required=False,
65
        default=u"part-1")
66
67
    # CONTAINER
68
    directives.widget(
69
        "container",
70
        UIDReferenceWidgetFactory,
71
        catalog=SETUP_CATALOG,
72
        query={
73
            "portal_type": "SampleContainer",
74
            "is_active": True,
75
            "sort_on": "sortable_title",
76
            "sort_order": "ascending",
77
        },
78
        display_template="<a href='${url}'>${Title}</a>",
79
        columns=get_default_columns)
80
    container = UIDReferenceField(
81
        title=_(
82
            u"label_sampletemplate_partition_container",
83
            default=u"Container"
84
        ),
85
        allowed_types=("SampleContainer", ),
86
        multi_valued=False,
87
        required=False)
88
89
    # PRESERVATION
90
    directives.widget(
91
        "preservation",
92
        UIDReferenceWidgetFactory,
93
        catalog=SETUP_CATALOG,
94
        query={
95
            "portal_type": "SamplePreservation",
96
            "is_active": True,
97
            "sort_on": "sortable_title",
98
            "sort_order": "ascending",
99
        },
100
        display_template="<a href='${url}'>${Title}</a>",
101
        columns=get_default_columns)
102
    preservation = UIDReferenceField(
103
        title=_(
104
            u"label_sampletemplate_partition_preservation",
105
            default=u"Preservation"
106
        ),
107
        allowed_types=("SamplePreservation", ),
108
        multi_valued=False,
109
        required=False)
110
111
    # SAMPLE TYPE
112
    directives.widget(
113
        "sampletype",
114
        UIDReferenceWidgetFactory,
115
        catalog=SETUP_CATALOG,
116
        query={
117
            "portal_type": "SampleType",
118
            "is_active": True,
119
            "sort_on": "sortable_title",
120
            "sort_order": "ascending",
121
        },
122
        display_template="<a href='${url}'>${Title}</a>",
123
        columns=get_default_columns)
124
    sampletype = UIDReferenceField(
125
        title=_(
126
            u"label_sampletemplate_partition_sampletype",
127
            default=u"Sample Type"
128
        ),
129
        allowed_types=("SampleType", ),
130
        multi_valued=False,
131
        required=False)
132
133
134
class ISampleTemplateSchema(model.Schema):
135
    """Schema interface
136
    """
137
138
    # Fieldset "Partitions"
139
    model.fieldset(
140
        "partitions",
141
        label=_(u"fieldset_sampletemplate_partitions",
142
                default=u"Partitions"),
143
        fields=[
144
            "partitions",
145
            "auto_partition",
146
        ]
147
    )
148
149
    # Fieldset "Analyses"
150
    model.fieldset(
151
        "analyses",
152
        label=_(u"fieldset_sampletemplate_analyses",
153
                default=u"Analyses"),
154
        fields=[
155
            "services",
156
        ]
157
    )
158
159
    # Title
160
    title = schema.TextLine(
161
        title=_(u"title_sampletemplate_title",
162
                default=u"Name"),
163
        required=True)
164
165
    # Description
166
    description = schema.Text(
167
        title=_(u"title_sampletemplate_description",
168
                default=u"Description"),
169
        required=False)
170
171
    # Sample Point
172
    directives.widget(
173
        "samplepoint",
174
        UIDReferenceWidgetFactory,
175
        catalog=SETUP_CATALOG,
176
        query={
177
            "portal_type": "SamplePoint",
178
            "is_active": True,
179
            "sort_on": "sortable_title",
180
            "sort_order": "ascending",
181
        },
182
        display_template="<a href='${url}'>${Title}</a>",
183
        columns=get_default_columns)
184
    samplepoint = UIDReferenceField(
185
        title=_(u"label_sampletemplate_samplepoint",
186
                default=u"Sample Point"),
187
        description=_(u"description_sampletemplate_samplepoint",
188
                      default=u"Select the sample point for this template"),
189
        allowed_types=("SamplePoint", ),
190
        multi_valued=False,
191
        required=False)
192
193
    # Sample Type
194
    directives.widget(
195
        "sampletype",
196
        UIDReferenceWidgetFactory,
197
        catalog=SETUP_CATALOG,
198
        query={
199
            "portal_type": "SampleType",
200
            "is_active": True,
201
            "sort_on": "sortable_title",
202
            "sort_order": "ascending",
203
        },
204
        display_template="<a href='${url}'>${Title}</a>",
205
        columns=get_default_columns)
206
    sampletype = UIDReferenceField(
207
        title=_(u"label_sampletemplate_sampletype",
208
                default=u"Sample Type"),
209
        description=_(u"description_sampletemplate_sampletype",
210
                      default=u"Select the sample type for this template"),
211
        allowed_types=("SampleType", ),
212
        multi_valued=False,
213
        required=False)
214
215
    # Composite
216
    composite = schema.Bool(
217
        title=_(u"title_sampletemplate_composite",
218
                default=u"Composite"),
219
        description=_(u"description_sampletemplate_composite",
220
                      default=u"Select if the sample is a mix of sub samples"),
221
        default=False,
222
        required=False)
223
224
    # Sampling Required
225
    sampling_required = schema.Bool(
226
        title=_(u"title_sampletemplate_sampling_required",
227
                default=u"Sample collected by the laboratory"),
228
        description=_(u"description_sampletemplate_sampling_required",
229
                      default=u"Enable sampling workflow for the created "
230
                              u"samples"),
231
        default=False,
232
        required=False)
233
234
    # Partitions (Data Grid)
235
    directives.widget(
236
        "partitions",
237
        DataGridWidgetFactory,
238
        allow_insert=False,  # only auto append
239
        allow_delete=True,
240
        allow_reorder=True,
241
        auto_append=True)
242
    partitions = schema.List(
243
        title=_(u"label_sampletemplate_partitions",
244
                default=u"Partitions"),
245
        description=_(u"description_sampletemplate_partitions",
246
                      default=u"Each line defines a new partition "
247
                      u"identified by the 'Partition ID'"),
248
        value_type=DataGridRow(
249
            title=u"Partition Scheme",
250
            schema=IPartitionRecord),
251
        required=True,
252
        default=[])
253
254
    # Auto Partition
255
    auto_partition = schema.Bool(
256
        title=_(u"title_sampletemplate_auto_partition",
257
                default=u"Auto-partition on sample reception"),
258
        description=_(u"description_sampletemplate_auto_partition",
259
                      default=u"Automatically redirect the user to the "
260
                              u"partitions view when the sample is received"),
261
        default=False,
262
        required=False)
263
264
    # Services
265
    directives.widget("services",
266
                      ListingWidgetFactory,
267
                      listing_view="sampletemplate_services_widget")
268
    services = schema.List(
269
        title=_(
270
            u"title_sampletemplate_services",
271
            default=u"Services"
272
        ),
273
        description=_(
274
            u"description_sampletemplate_services",
275
            default=u"Select the services for this template"
276
        ),
277
        value_type=DataGridRow(schema=IServiceRecord),
278
        default=[],
279
        required=True,
280
    )
281
282
283
@implementer(ISampleTemplate, ISampleTemplateSchema, IDeactivable)
284
class SampleTemplate(Container, ClientAwareMixin):
285
    """SampleTemplate
286
    """
287
    # Catalogs where this type will be catalogued
288
    _catalogs = [SETUP_CATALOG]
289
290
    security = ClassSecurityInfo()
291
292
    @security.protected(permissions.View)
293
    def getRawSamplePoint(self):
294
        accessor = self.accessor("samplepoint", raw=True)
295
        return accessor(self)
296
297
    @security.protected(permissions.View)
298
    def getSamplePoint(self):
299
        samplepoint = self.getRawSamplePoint()
300
        if not samplepoint:
301
            return None
302
        return api.get_object(samplepoint)
303
304
    @security.protected(permissions.ModifyPortalContent)
305
    def setSamplePoint(self, value):
306
        mutator = self.mutator("samplepoint")
307
        mutator(self, value)
308
309
    @deprecate("deprecated since SENAITE 2.6: Use getRawSamplePoint() instead")
310
    @security.protected(permissions.View)
311
    def getSamplePointUID(self):
312
        return self.getRawSamplePoint()
313
314
    # BBB: AT schema field property
315
    SamplePoint = property(getSamplePoint, setSamplePoint)
316
    SamplePointUID = property(getRawSamplePoint)
317
318
    @security.protected(permissions.View)
319
    def getRawSampleType(self):
320
        accessor = self.accessor("sampletype", raw=True)
321
        return accessor(self)
322
323
    @security.protected(permissions.View)
324
    def getSampleType(self):
325
        sampletype = self.getRawSampleType()
326
        if not sampletype:
327
            return None
328
        return api.get_object(sampletype)
329
330
    @security.protected(permissions.ModifyPortalContent)
331
    def setSampleType(self, value):
332
        mutator = self.mutator("sampletype")
333
        mutator(self, value)
334
335
    # BBB: AT schema field property
336
    SampleType = property(getSampleType, setSampleType)
337
338
    @security.protected(permissions.View)
339
    def getComposite(self):
340
        accessor = self.accessor("composite")
341
        return bool(accessor(self))
342
343
    @security.protected(permissions.ModifyPortalContent)
344
    def setComposite(self, value):
345
        mutator = self.mutator("composite")
346
        mutator(self, bool(value))
347
348
    # BBB: AT schema field property
349
    Composite = property(getComposite, setComposite)
350
351
    @security.protected(permissions.View)
352
    def getSamplingRequired(self):
353
        accessor = self.accessor("sampling_required")
354
        return bool(accessor(self))
355
356
    @security.protected(permissions.ModifyPortalContent)
357
    def setSamplingRequired(self, value):
358
        mutator = self.mutator("sampling_required")
359
        mutator(self, bool(value))
360
361
    @security.protected(permissions.View)
362
    def getSamplingRequiredDefaultValue(self):
363
        """Get the setup if the sampling workflow is enabled or not
364
        """
365
        bikasetup = api.get_bika_setup()
366
        return bikasetup.getSamplingWorkflowEnabled()
367
368
    # BBB: AT schema field property
369
    SamplingRequired = property(getSamplingRequired, setSamplingRequired)
370
371
    @security.protected(permissions.View)
372
    def getPartitions(self):
373
        accessor = self.accessor("partitions")
374
        records = accessor(self) or []
375
376
        # UIDReference sub-fields are stored as a list!
377
        # See https://github.com/senaite/senaite.core/pull/2630
378
        def first(record, subfield):
379
            items = record.get(subfield)
380
            return items[0] if items else ""
381
382
        partitions = []
383
        for record in records:
384
            partition = record.copy()
385
            partition.update({
386
                "sampletype": first(record, "sampletype"),
387
                "container": first(record, "container"),
388
                "preservation": first(record, "preservation"),
389
            })
390
            partitions.append(partition)
391
392
        return partitions
393
394
    @security.protected(permissions.ModifyPortalContent)
395
    def setPartitions(self, value):
396
        """Set partitions for the template
397
        """
398
        if not isinstance(value, (list, dict)):
399
            raise TypeError(
400
                "Expected a dict or list, got %r" % type(value))
401
        if isinstance(value, dict):
402
            value = [value]
403
        records = []
404
        for v in value:
405
            part_id = v.get("part_id", "")
406
            container = v.get("container", "")
407
            preservation = v.get("preservation", "")
408
            sampletype = v.get("sampletype", "")
409
410
            # ensure UIDs for reference fields
411
            if container:
412
                container = api.get_uid(container)
413
            if preservation:
414
                preservation = api.get_uid(preservation)
415
            if sampletype:
416
                sampletype = api.get_uid(sampletype)
417
418
            records.append({
419
                "part_id": part_id,
420
                "container": container,
421
                "preservation": preservation,
422
                "sampletype": sampletype,
423
            })
424
        mutator = self.mutator("partitions")
425
        mutator(self, records)
426
427
    # BBB: AT schema field property
428
    Partitions = property(getPartitions, setPartitions)
429
430
    @security.protected(permissions.View)
431
    def getAutoPartition(self):
432
        accessor = self.accessor("auto_partition")
433
        return bool(accessor(self))
434
435
    @security.protected(permissions.ModifyPortalContent)
436
    def setAutoPartition(self, value):
437
        mutator = self.mutator("auto_partition")
438
        mutator(self, bool(value))
439
440
    # BBB: AT schema field property
441
    AutoPartition = property(getAutoPartition, setAutoPartition)
442
443
    @security.protected(permissions.View)
444
    def getRawServices(self):
445
        """Return the raw value of the services field
446
447
        >>> self.getRawServices()
448
        [{'uid': '...', 'part_id': 'part-1', 'hidden': False}, ...]
449
450
        :returns: List of dicts including `uid`, `hidden` and `part_id`
451
        """
452
        accessor = self.accessor("services")
453
        return accessor(self) or []
454
455
    @security.protected(permissions.View)
456
    def getServices(self):
457
        """Returns a list of service objects
458
459
        >>> self.getServices()
460
        [<AnalysisService at ...>,  <AnalysisService at ...>, ...]
461
462
        :returns: List of analysis service objects
463
        """
464
        records = self.getRawServices()
465
        service_uids = map(lambda r: r.get("uid"), records)
466
        return list(map(api.get_object, service_uids))
467
468
    @security.protected(permissions.ModifyPortalContent)
469
    def setServices(self, value):
470
        """Set services for the template
471
472
        This method accepts either a list of analysis service objects, a list
473
        of analysis service UIDs or a list of analysis profile service records
474
        containing the keys `uid`, `hidden` and `part_id`:
475
476
        >>> self.setServices([<AnalysisService at ...>, ...])
477
        >>> self.setServices(['353e1d9bd45d45dbabc837114a9c41e6', '...', ...])
478
        >>> self.setServices([{'hidden': False, 'uid': '...'}, ...])
479
480
        Raises a TypeError if the value does not match any allowed type.
481
        """
482
        if not isinstance(value, list):
483
            value = [value]
484
485
        records = []
486
        for v in value:
487
            uid = ""
488
            hidden = False
489
            part_id = ""
490
            if isinstance(v, dict):
491
                uid = api.get_uid(v.get("uid"))
492
                hidden = v.get("hidden",
493
                               api.get_object(uid).getHidden())
494
                part_id = v.get("part_id", "")
495
            elif api.is_object(v):
496
                uid = api.get_uid(v)
497
                hidden = v.getHidden()
498
            elif api.is_uid(v):
499
                obj = api.get_object(v)
500
                uid = v
501
                hidden = obj.getHidden()
502
            else:
503
                raise TypeError(
504
                    "Expected object, uid or record, got %r" % type(v))
505
            records.append({
506
                "uid": uid,
507
                "hidden": hidden,
508
                "part_id": part_id,
509
            })
510
511
        mutator = self.mutator("services")
512
        mutator(self, records)
513
514
    # BBB: AT schema field property
515
    Services = property(getServices, setServices)
516
517
    @deprecate("deprecated since SENAITE 2.6: Use getRawServices() instead")
518
    @security.protected(permissions.View)
519
    def getAnalysisServicesSettings(self):
520
        """BBB: Return the settings for all assigned services
521
522
        :returns: List of dicts including `uid`, `hidden` and `part_id`
523
        """
524
        # Note: We store the selected service UIDs, hidden and part_id
525
        # settings in the `services` field. Therefore, we can just return the
526
        # raw value.
527
        return self.getRawServices()
528
529
    @security.protected(permissions.ModifyPortalContent)
530
    def setAnalysisServicesSettings(self, settings):
531
        """BBB: Update settings for all assigned service UIDs
532
533
        This method expects a list of dictionaries containing the service
534
        `uid`, `part_id` and the `hidden` setting.
535
536
        This is basically the same format as stored in the `services` field!
537
538
        However, we want to just update the settings for selected service UIDs"
539
540
        >>> settings =  [{'uid': '...', 'hidden': False, 'part_id': 'part-1'}]
541
        >>> setAnalysisServicesSettings(settings)
542
        """
543
        if not isinstance(settings, list):
544
            settings = [settings]
545
546
        by_uid = self.get_services_by_uid()
547
548
        for setting in settings:
549
            if not isinstance(setting, dict):
550
                raise TypeError(
551
                    "Expected a record containing `uid`, `hidden` and"
552
                    "`partition`, got %s" % type(setting))
553
            uid = api.get_uid(setting.get("uid"))
554
            hidden = setting.get("hidden", api.get_object(uid).getHidden())
555
            part_id = setting.get("part_id", "")
556
557
            if not uid:
558
                raise ValueError("UID is missing in setting %r" % setting)
559
560
            record = by_uid.get(api.get_uid(uid))
561
            if not record:
562
                continue
563
            record["hidden"] = hidden
564
            record["part_id"] = part_id
565
566
        # set back the new services
567
        self.setServices(by_uid.values())
568
569
    @security.protected(permissions.View)
570
    def getAnalysisServiceSettings(self, service):
571
        """Returns the settings for a single service
572
        """
573
        uid = api.get_uid(service)
574
        by_uid = self.get_services_by_uid()
575
        record = by_uid.get(uid, {
576
            "uid": uid,
577
            "part_id": "",
578
            "hidden": False,
579
        })
580
        return record
581
582
    @security.protected(permissions.View)
583
    def isAnalysisServiceHidden(self, service):
584
        """Check if the service is configured as hidden
585
        """
586
        uid = api.get_uid(service)
587
        services = self.get_services_by_uid()
588
        record = services.get(uid)
589
        if not record:
590
            obj = api.get_object(service)
591
            return obj.getRawHidden()
592
        return record.get("hidden", False)
593
594
    @security.protected(permissions.View)
595
    def getAnalysisServicePartitionID(self, service):
596
        """Get the assigned parition for the service
597
        """
598
        uid = api.get_uid(service)
599
        services = self.get_services_by_uid()
600
        record = services.get(uid)
601
        if not record:
602
            return ""
603
        return record.get("part_id", "")
604
605
    @security.protected(permissions.View)
606
    def getAnalysisServiceUIDs(self):
607
        """Returns a list of all assigned service UIDs
608
        """
609
        services = self.getRawServices()
610
        return list(map(lambda record: record.get("uid"), services))
611
612
    @security.protected(permissions.View)
613
    def get_services_by_uid(self):
614
        """Return the selected services grouped by UID
615
        """
616
        records = {}
617
        for record in self.services:
618
            records[record.get("uid")] = record
619
        return records
620
621 View Code Duplication
    def remove_service(self, service):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
622
        """Remove the service from the template
623
624
        If the service is not selected in the profile, returns False.
625
626
        NOTE: This method is used when an Analysis Service was deactivated.
627
628
        :param service: The service to be removed from this template
629
        :type service: AnalysisService
630
        :return: True if the AnalysisService has been removed successfully
631
        """
632
        # get the UID of the service that should be removed
633
        uid = api.get_uid(service)
634
        # get the current raw value of the services field.
635
        current_services = self.getRawServices()
636
        # filter out the UID of the service
637
        new_services = filter(
638
            lambda record: record.get("uid") != uid, current_services)
639
640
        # check if the service was removed or not
641
        current_services_count = len(current_services)
642
        new_services_count = len(new_services)
643
644
        if current_services_count == new_services_count:
645
            # service was not part of the profile
646
            return False
647
648
        # set the new services
649
        self.setServices(new_services)
650
651
        return True
652