Passed
Push — 2.x ( ca0980...375d4d )
by Ramon
08:08
created

bika.lims.utils.analysisrequest.to_services_uids()   A

Complexity

Conditions 4

Size

Total Lines 28
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 28
rs 9.65
c 0
b 0
f 0
cc 4
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
from collections import OrderedDict
23
from string import Template
24
25
import six
26
from bika.lims import api
27
from bika.lims import bikaMessageFactory as _
28
from bika.lims import logger
29
from bika.lims.api.mail import compose_email
30
from bika.lims.api.mail import is_valid_email_address
31
from bika.lims.api.mail import send_email
32
from bika.lims.interfaces import IAnalysisRequest
33
from bika.lims.interfaces import IAnalysisRequestRetest
34
from bika.lims.interfaces import IAnalysisRequestSecondary
35
from bika.lims.interfaces import IAnalysisService
36
from bika.lims.interfaces import IReceived
37
from bika.lims.interfaces import IRoutineAnalysis
38
from bika.lims.utils import changeWorkflowState
39
from bika.lims.utils import copy_field_values
40
from bika.lims.utils import get_link
41
from bika.lims.utils import tmpID
42
from bika.lims.workflow import doActionFor
43
from DateTime import DateTime
44
from Products.Archetypes.event import ObjectInitializedEvent
45
from Products.CMFPlone.utils import _createObjectByType
46
from senaite.core.catalog import SETUP_CATALOG
47
from senaite.core.idserver import renameAfterCreation
48
from senaite.core.permissions.sample import can_receive
49
from senaite.core.workflow import ANALYSIS_WORKFLOW
50
from senaite.core.workflow import SAMPLE_WORKFLOW
51
from zope import event
52
from zope.interface import alsoProvides
53
54
55
def create_analysisrequest(client, request, values, analyses=None,
56
                           results_ranges=None, prices=None):
57
    """Creates a new AnalysisRequest (a Sample) object
58
    :param client: The container where the Sample will be created
59
    :param request: The current Http Request object
60
    :param values: A dict, with keys as AnalaysisRequest's schema field names
61
    :param analyses: List of Services or Analyses (brains, objects, UIDs,
62
        keywords). Extends the list from values["Analyses"]
63
    :param results_ranges: List of Results Ranges. Extends the results ranges
64
        from the Specification object defined in values["Specification"]
65
    :param prices: Mapping of AnalysisService UID -> price. If not set, prices
66
        are read from the associated analysis service.
67
    """
68
    # Don't pollute the dict param passed in
69
    values = dict(values.items())
70
71
    # Explicitly set client instead of relying on the passed in vales.
72
    # This might happen if this function is called programmatically outside of
73
    # the sample add form.
74
    values["Client"] = client
75
76
    # Resolve the Service uids of analyses to be added in the Sample. Values
77
    # passed-in might contain Profiles and also values that are not uids. Also,
78
    # additional analyses can be passed-in through either values or services
79
    service_uids = to_services_uids(values=values, services=analyses)
80
81
    # Remove the Analyses from values. We will add them manually
82
    values.update({"Analyses": []})
83
84
    # Remove the specificaton to set it *after* the analyses have been added
85
    specification = values.pop("Specification", None)
86
87
    # Create the Analysis Request and submit the form
88
    ar = _createObjectByType("AnalysisRequest", client, tmpID())
89
    # mark the sample as temporary to avoid indexing
90
    api.mark_temporary(ar)
91
    # NOTE: We call here `_processForm` (with underscore) to manually unmark
92
    #       the creation flag and trigger the `ObjectInitializedEvent`, which
93
    #       is used for snapshot creation.
94
    ar._processForm(REQUEST=request, values=values)
95
96
    # Set the analyses manually
97
    ar.setAnalyses(service_uids, prices=prices, specs=results_ranges)
98
99
    # Explicitly set the specification to the sample
100
    if specification:
101
        ar.setSpecification(specification)
102
103
    # Handle hidden analyses from template and profiles
104
    # https://github.com/senaite/senaite.core/issues/1437
105
    # https://github.com/senaite/senaite.core/issues/1326
106
    apply_hidden_services(ar)
107
108
    # Handle rejection reasons
109
    rejection_reasons = resolve_rejection_reasons(values)
110
    ar.setRejectionReasons(rejection_reasons)
111
112
    # Handle secondary Analysis Request
113
    primary = ar.getPrimaryAnalysisRequest()
114
    if primary:
115
        # Mark the secondary with the `IAnalysisRequestSecondary` interface
116
        alsoProvides(ar, IAnalysisRequestSecondary)
117
118
        # Set dates to match with those from the primary
119
        ar.setDateSampled(primary.getDateSampled())
120
        ar.setSamplingDate(primary.getSamplingDate())
121
122
        # Force the transition of the secondary to received and set the
123
        # description/comment in the transition accordingly.
124
        date_received = primary.getDateReceived()
125
        if date_received:
126
            receive_sample(ar, date_received=date_received)
127
128
    parent_sample = ar.getParentAnalysisRequest()
129
    if parent_sample:
130
        # Always set partition to received
131
        date_received = parent_sample.getDateReceived()
132
        receive_sample(ar, date_received=date_received)
133
134
    if not IReceived.providedBy(ar):
135
        setup = api.get_setup()
136
        auto_receive = setup.getAutoreceiveSamples()
137
138
        if ar.getSamplingRequired():
139
            # sample has not been collected yet
140
            changeWorkflowState(ar, SAMPLE_WORKFLOW, "to_be_sampled",
141
                                action="to_be_sampled")
142
143
        elif auto_receive and ar.getDateSampled() and can_receive(ar):
144
            # auto-receive the sample, but only if the user (that might be
145
            # a client) has enough privileges and the sample has a value set
146
            # for DateSampled. Otherwise, sample_due
147
            receive_sample(ar)
148
149
        else:
150
            # sample_due is the default initial status of the sample
151
            changeWorkflowState(ar, SAMPLE_WORKFLOW, "sample_due",
152
                                action="no_sampling_workflow")
153
154
    renameAfterCreation(ar)
155
    # AT only
156
    ar.unmarkCreationFlag()
157
    # unmark the sample as temporary
158
    api.unmark_temporary(ar)
159
    # explicit reindexing after sample finalization
160
    api.catalog_object(ar)
161
    # notify object initialization (also creates a snapshot)
162
    event.notify(ObjectInitializedEvent(ar))
163
164
    # If rejection reasons have been set, reject the sample automatically
165
    if rejection_reasons:
166
        do_rejection(ar)
167
168
    return ar
169
170
171
def receive_sample(sample, check_permission=False, date_received=None):
172
    """Receive the sample without transition
173
    """
174
175
    # NOTE: In `sample_registered` state we do not grant any roles the
176
    #       permission to receive a sample! Not sure if this can be ignored
177
    #       when the LIMS is configured to auto-receive samples?
178
    if check_permission and not can_receive(sample):
179
        return False
180
181
    changeWorkflowState(sample, SAMPLE_WORKFLOW, "sample_received",
182
                        action="receive")
183
184
    # Mark the secondary as received
185
    alsoProvides(sample, IReceived)
186
    # Manually set the received date
187
    if not date_received:
188
        date_received = DateTime()
189
    sample.setDateReceived(date_received)
190
191
    # Initialize analyses
192
    # NOTE: We use here `objectValues` instead of `getAnalyses`,
193
    #       because the Analyses are not yet indexed!
194
    for obj in sample.objectValues():
195
        if obj.portal_type != "Analysis":
196
            continue
197
        changeWorkflowState(obj, ANALYSIS_WORKFLOW, "unassigned",
198
                            action="initialize")
199
200
    return True
201
202
203
def apply_hidden_services(sample):
204
    """
205
    Applies the hidden setting to the sample analyses in accordance with the
206
    settings from its template and/or profiles
207
    :param sample: the sample that contains the analyses
208
    """
209
    hidden = list()
210
211
    # Get the "hidden" service uids from the template
212
    template = sample.getTemplate()
213
    hidden = get_hidden_service_uids(template)
214
215
    # Get the "hidden" service uids from profiles
216
    profiles = sample.getProfiles()
217
    hid_profiles = map(get_hidden_service_uids, profiles)
218
    hid_profiles = list(itertools.chain(*hid_profiles))
219
    hidden.extend(hid_profiles)
220
221
    # Update the sample analyses
222
    if api.is_temporary(sample):
223
        # sample is in create process. Just return the object values.
224
        analyses = sample.objectValues(spec="Analysis")
225
    else:
226
        analyses = sample.getAnalyses(full_objects=True)
227
    analyses = filter(lambda an: an.getServiceUID() in hidden, analyses)
228
    for analysis in analyses:
229
        analysis.setHidden(True)
230
231
232
def get_hidden_service_uids(profile_or_template):
233
    """Returns a list of service uids that are set as hidden
234
    :param profile_or_template: ARTemplate or AnalysisProfile object
235
    """
236
    if not profile_or_template:
237
        return []
238
    settings = profile_or_template.getAnalysisServicesSettings()
239
    hidden = filter(lambda ser: ser.get("hidden", False), settings)
240
    return map(lambda setting: setting["uid"], hidden)
241
242
243
def to_services_uids(services=None, values=None):
244
    """Returns a list of Analysis Services UIDS
245
246
    :param services: A list of service items (uid, keyword, brain, obj, title)
247
    :param values: a dict, where keys are AR|Sample schema field names.
248
    :returns: a list of Analyses Services UIDs
249
    """
250
    def to_list(value):
251
        if not value:
252
            return []
253
        if isinstance(value, six.string_types):
254
            return [value]
255
        if isinstance(value, (list, tuple)):
256
            return value
257
        logger.warn("Cannot convert to a list: {}".format(value))
258
        return []
259
260
    services = services or []
261
    values = values or {}
262
263
    # Merge analyses from analyses_serv and values into one list
264
    uids = to_list(services) + to_list(values.get("Analyses"))
265
266
    # Convert them to a list of service uids
267
    uids = filter(None, map(to_service_uid, uids))
268
269
    # Get the service uids without duplicates, but preserving the order
270
    return list(OrderedDict.fromkeys(uids).keys())
271
272
273
def to_service_uid(uid_brain_obj_str):
274
    """Resolves the passed in element to a valid uid. Returns None if the value
275
    cannot be resolved to a valid uid
276
    """
277
    if api.is_uid(uid_brain_obj_str) and uid_brain_obj_str != "0":
278
        return uid_brain_obj_str
279
280
    if api.is_object(uid_brain_obj_str):
281
        obj = api.get_object(uid_brain_obj_str)
282
283
        if IAnalysisService.providedBy(obj):
284
            return api.get_uid(obj)
285
286
        elif IRoutineAnalysis.providedBy(obj):
287
            return obj.getServiceUID()
288
289
        else:
290
            logger.error("Type not supported: {}".format(obj.portal_type))
291
            return None
292
293
    if isinstance(uid_brain_obj_str, six.string_types):
294
        # Maybe is a keyword?
295
        query = dict(portal_type="AnalysisService", getKeyword=uid_brain_obj_str)
296
        brains = api.search(query, SETUP_CATALOG)
297
        if len(brains) == 1:
298
            return api.get_uid(brains[0])
299
300
        # Or maybe a title
301
        query = dict(portal_type="AnalysisService", title=uid_brain_obj_str)
302
        brains = api.search(query, SETUP_CATALOG)
303
        if len(brains) == 1:
304
            return api.get_uid(brains[0])
305
306
    return None
307
308
309
def create_retest(ar):
310
    """Creates a retest (Analysis Request) from an invalidated Analysis Request
311
    :param ar: The invalidated Analysis Request
312
    :type ar: IAnalysisRequest
313
    :rtype: IAnalysisRequest
314
    """
315
    if not ar:
316
        raise ValueError("Source Analysis Request cannot be None")
317
318
    if not IAnalysisRequest.providedBy(ar):
319
        raise ValueError("Type not supported: {}".format(repr(type(ar))))
320
321
    if ar.getRetest():
322
        # Do not allow the creation of another retest!
323
        raise ValueError("Retest already set")
324
325
    if not ar.isInvalid():
326
        # Analysis Request must be in 'invalid' state
327
        raise ValueError("Cannot do a retest from an invalid Analysis Request")
328
329
    # Create the Retest (Analysis Request)
330
    ignore = ['Analyses', 'DatePublished', 'Invalidated', 'Sample', 'Remarks']
331
    retest = _createObjectByType("AnalysisRequest", ar.aq_parent, tmpID())
332
    copy_field_values(ar, retest, ignore_fieldnames=ignore)
333
334
    # Mark the retest with the `IAnalysisRequestRetest` interface
335
    alsoProvides(retest, IAnalysisRequestRetest)
336
337
    # Assign the source to retest
338
    retest.setInvalidated(ar)
339
340
    # Rename the retest according to the ID server setup
341
    renameAfterCreation(retest)
342
343
    # Copy the analyses from the source
344
    intermediate_states = ['retracted', ]
345
    for an in ar.getAnalyses(full_objects=True):
346
        # skip retests
347
        if an.isRetest():
348
            continue
349
350
        if api.get_workflow_status_of(an) in intermediate_states:
351
            # Exclude intermediate analyses
352
            continue
353
354
        # Original sample might have multiple copies of same analysis
355
        keyword = an.getKeyword()
356
        analyses = retest.getAnalyses(full_objects=True)
357
        analyses = filter(lambda ret: ret.getKeyword() == keyword, analyses)
0 ignored issues
show
introduced by
The variable keyword does not seem to be defined for all execution paths.
Loading history...
358
        if analyses:
359
            keyword = '{}-{}'.format(keyword, len(analyses))
360
361
        # Create the analysis retest
362
        nan = _createObjectByType("Analysis", retest, keyword)
363
364
        # Make a copy
365
        ignore_fieldnames = ['DataAnalysisPublished']
366
        copy_field_values(an, nan, ignore_fieldnames=ignore_fieldnames)
367
        nan.unmarkCreationFlag()
368
        nan.reindexObject()
369
370
    # Transition the retest to "sample_received"!
371
    changeWorkflowState(retest, SAMPLE_WORKFLOW, 'sample_received')
372
    alsoProvides(retest, IReceived)
373
374
    # Initialize analyses
375
    for analysis in retest.getAnalyses(full_objects=True):
376
        if not IRoutineAnalysis.providedBy(analysis):
377
            continue
378
        changeWorkflowState(analysis, ANALYSIS_WORKFLOW, "unassigned")
379
380
    # Reindex and other stuff
381
    retest.reindexObject()
382
    retest.aq_parent.reindexObject()
383
384
    return retest
385
386
387
def create_partition(analysis_request, request, analyses, sample_type=None,
388
                     container=None, preservation=None, skip_fields=None,
389
                     internal_use=True):
390
    """
391
    Creates a partition for the analysis_request (primary) passed in
392
    :param analysis_request: uid/brain/object of IAnalysisRequest type
393
    :param request: the current request object
394
    :param analyses: uids/brains/objects of IAnalysis type
395
    :param sampletype: uid/brain/object of SampleType
396
    :param container: uid/brain/object of Container
397
    :param preservation: uid/brain/object of SamplePreservation
398
    :param skip_fields: names of fields to be skipped on copy from primary
399
    :return: the new partition
400
    """
401
    partition_skip_fields = [
402
        "Analyses",
403
        "Attachment",
404
        "Client",
405
        "DetachedFrom",
406
        "Profile",
407
        "Profiles",
408
        "RejectionReasons",
409
        "Remarks",
410
        "ResultsInterpretation",
411
        "ResultsInterpretationDepts",
412
        "Sample",
413
        "Template",
414
        "creation_date",
415
        "modification_date",
416
        "ParentAnalysisRequest",
417
        "PrimaryAnalysisRequest",
418
        # default fields
419
        "id",
420
        "description",
421
        "allowDiscussion",
422
        "subject",
423
        "location",
424
        "contributors",
425
        "creators",
426
        "effectiveDate",
427
        "expirationDate",
428
        "language",
429
        "rights",
430
        "creation_date",
431
        "modification_date",
432
    ]
433
    if skip_fields:
434
        partition_skip_fields.extend(skip_fields)
435
        partition_skip_fields = list(set(partition_skip_fields))
436
437
    # Copy field values from the primary analysis request
438
    ar = api.get_object(analysis_request)
439
    record = fields_to_dict(ar, partition_skip_fields)
440
441
    # Update with values that are partition-specific
442
    record.update({
443
        "InternalUse": internal_use,
444
        "ParentAnalysisRequest": api.get_uid(ar),
445
    })
446
    if sample_type is not None:
447
        record["SampleType"] = sample_type and api.get_uid(sample_type) or ""
448
    if container is not None:
449
        record["Container"] = container and api.get_uid(container) or ""
450
    if preservation is not None:
451
        record["Preservation"] = preservation and api.get_uid(preservation) or ""
452
453
    # Create the Partition
454
    client = ar.getClient()
455
    analyses = list(set(map(api.get_object, analyses)))
456
    services = map(lambda an: an.getAnalysisService(), analyses)
457
458
    # Populate the root's ResultsRanges to partitions
459
    results_ranges = ar.getResultsRange() or []
460
461
    partition = create_analysisrequest(client,
462
                                       request=request,
463
                                       values=record,
464
                                       analyses=services,
465
                                       results_ranges=results_ranges)
466
467
    # Reindex Parent Analysis Request
468
    ar.reindexObject(idxs=["isRootAncestor"])
469
470
    return partition
471
472
473
def fields_to_dict(obj, skip_fields=None):
474
    """
475
    Generates a dictionary with the field values of the object passed in, where
476
    keys are the field names. Skips computed fields
477
    """
478
    data = {}
479
    obj = api.get_object(obj)
480
    for field_name, field in api.get_fields(obj).items():
481
        if skip_fields and field_name in skip_fields:
482
            continue
483
        if field.type == "computed":
484
            continue
485
        data[field_name] = field.get(obj)
486
    return data
487
488
489
def resolve_rejection_reasons(values):
490
    """Resolves the rejection reasons from the submitted values to the format
491
    supported by Sample's Rejection Reason field
492
    """
493
    rejection_reasons = values.get("RejectionReasons")
494
    if not rejection_reasons:
495
        return []
496
497
    # XXX RejectionReasons returns a list with a single dict
498
    reasons = rejection_reasons[0] or {}
499
    if reasons.get("checkbox") != "on":
500
        # reasons entry is toggled off
501
        return []
502
503
    # Predefined reasons selected?
504
    selected = reasons.get("multiselection") or []
505
506
    # Other reasons set?
507
    other = reasons.get("other") or ""
508
509
    # If neither selected nor other reasons are set, return empty
510
    if any([selected, other]):
511
        return [{"selected": selected, "other": other}]
512
513
    return []
514
515
516
def do_rejection(sample, notify=None):
517
    """Rejects the sample and if succeeds, generates the rejection pdf and
518
    sends a notification email. If notify is None, the notification email will
519
    only be sent if the setting in Setup is enabled
520
    """
521
    sample_id = api.get_id(sample)
522
    if not sample.getRejectionReasons():
523
        logger.warn("Cannot reject {} w/o rejection reasons".format(sample_id))
524
        return
525
526
    success, msg = doActionFor(sample, "reject")
527
    if not success:
528
        logger.warn("Cannot reject the sample {}".format(sample_id))
529
        return
530
531
    # Generate a pdf with the rejection reasons
532
    pdf = get_rejection_pdf(sample)
533
534
    # Attach the PDF to the sample
535
    filename = "{}-rejected.pdf".format(sample_id)
536
    attachment = sample.createAttachment(pdf, filename=filename)
537
    pdf_file = attachment.getAttachmentFile()
538
539
    # Do we need to send a notification email?
540
    if notify is None:
541
        setup = api.get_setup()
542
        notify = setup.getNotifyOnSampleRejection()
543
544
    if notify:
545
        # Compose and send the email
546
        mime_msg = get_rejection_mail(sample, pdf_file)
547
        if mime_msg:
548
            # Send the email
549
            send_email(mime_msg)
550
551
552
def get_rejection_pdf(sample):
553
    """Generates a pdf with sample rejection reasons
554
    """
555
    # Avoid circular dependencies
556
    from senaite.core.browser.samples.rejection.report import RejectionReport
557
558
    # Render the html's rejection document
559
    tpl = RejectionReport(sample, api.get_request())
560
    return tpl.to_pdf()
561
562
563
def get_rejection_email_recipients(sample):
564
    """Returns a list with the email addresses to send the rejection report
565
    """
566
    # extract the emails from contacts
567
    contacts = [sample.getContact()] + sample.getCCContact()
568
    contacts = filter(None, contacts)
569
    emails = map(lambda contact: contact.getEmailAddress(), contacts)
570
571
    # extend with the CC emails
572
    emails = list(emails) + sample.getCCEmails(as_list=True)
573
    emails = filter(is_valid_email_address, emails)
574
    return list(emails)
575
576
577
578
def get_rejection_mail(sample, rejection_pdf=None):
579
    """Generates an email to sample contacts with rejection reasons
580
    """
581
    # Get the reasons
582
    reasons = sample.getRejectionReasons()
583
    reasons = reasons and reasons[0] or {}
584
    reasons = reasons.get("selected", []) + [reasons.get("other")]
585
    reasons = filter(None, reasons)
586
    reasons = "<br/>- ".join(reasons)
587
588
    # Render the email body
589
    setup = api.get_setup()
590
    lab_address = setup.laboratory.getPrintAddress()
591
    email_body = Template(setup.getEmailBodySampleRejection())
592
    email_body = email_body.safe_substitute({
593
        "lab_address": "<br/>".join(lab_address),
594
        "reasons": reasons and "<br/>-{}".format(reasons) or "",
595
        "sample_id": api.get_id(sample),
596
        "sample_link": get_link(api.get_url(sample), api.get_id(sample))
597
    })
598
599
    def to_valid_email_address(contact):
600
        if not contact:
601
            return None
602
        address = contact.getEmailAddress()
603
        if not is_valid_email_address(address):
604
            return None
605
        return address
606
607
    # Get the recipients
608
    _to = get_rejection_email_recipients(sample)
609
    if not _to:
610
        # Cannot send an e-mail without recipient!
611
        logger.warn("No valid recipients for {}".format(api.get_id(sample)))
612
        return None
613
614
    lab = api.get_setup().laboratory
615
    attachments = rejection_pdf and [rejection_pdf] or []
616
617
    return compose_email(
618
        from_addr=lab.getEmailAddress(),
619
        to_addr=_to,
620
        subj=_("%s has been rejected") % api.get_id(sample),
621
        body=email_body,
622
        html=True,
623
        attachments=attachments)
624