bika.lims.utils.analysisrequest.create_retest()   D
last analyzed

Complexity

Conditions 12

Size

Total Lines 76
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 76
rs 4.8
c 0
b 0
f 0
cc 12
nop 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like bika.lims.utils.analysisrequest.create_retest() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import itertools
22
from collections import OrderedDict
23
from string import Template
24
25
import six
26
from bika.lims import api
27
from bika.lims import bikaMessageFactory as _
28
from bika.lims import logger
29
from bika.lims.api.mail import compose_email
30
from bika.lims.api.mail import is_valid_email_address
31
from bika.lims.api.mail import send_email
32
from bika.lims.interfaces import IAnalysisRequest
33
from bika.lims.interfaces import IAnalysisRequestRetest
34
from bika.lims.interfaces import IAnalysisRequestSecondary
35
from bika.lims.interfaces import IAnalysisService
36
from bika.lims.interfaces import IReceived
37
from bika.lims.interfaces import IRoutineAnalysis
38
from bika.lims.utils import changeWorkflowState
39
from bika.lims.utils import copy_field_values
40
from bika.lims.utils import get_link
41
from bika.lims.utils import tmpID
42
from bika.lims.workflow import doActionFor
43
from DateTime import DateTime
44
from Products.Archetypes.event import ObjectInitializedEvent
45
from Products.CMFPlone.utils import _createObjectByType
46
from senaite.core.api.workflow import check_guard
47
from senaite.core.catalog import SETUP_CATALOG
48
from senaite.core.idserver import renameAfterCreation
49
from senaite.core.permissions.sample import can_receive
50
from senaite.core.workflow import ANALYSIS_WORKFLOW
51
from senaite.core.workflow import SAMPLE_WORKFLOW
52
from zope import event
53
from zope.interface import alsoProvides
54
55
56
def create_analysisrequest(client, request, values, analyses=None,
57
                           results_ranges=None, prices=None):
58
    """Creates a new AnalysisRequest (a Sample) object
59
    :param client: The container where the Sample will be created
60
    :param request: The current Http Request object
61
    :param values: A dict, with keys as AnalaysisRequest's schema field names
62
    :param analyses: List of Services or Analyses (brains, objects, UIDs,
63
        keywords). Extends the list from values["Analyses"]
64
    :param results_ranges: List of Results Ranges. Extends the results ranges
65
        from the Specification object defined in values["Specification"]
66
    :param prices: Mapping of AnalysisService UID -> price. If not set, prices
67
        are read from the associated analysis service.
68
    """
69
    # Don't pollute the dict param passed in
70
    values = dict(values.items())
71
72
    # Explicitly set client instead of relying on the passed in vales.
73
    # This might happen if this function is called programmatically outside of
74
    # the sample add form.
75
    values["Client"] = client
76
77
    # Resolve the Service uids of analyses to be added in the Sample. Values
78
    # passed-in might contain Profiles and also values that are not uids. Also,
79
    # additional analyses can be passed-in through either values or services
80
    service_uids = to_service_uids(services=analyses, values=values)
81
82
    # Remove the Analyses from values. We will add them manually
83
    values.update({"Analyses": []})
84
85
    # Remove the specificaton to set it *after* the analyses have been added
86
    specification = values.pop("Specification", None)
87
88
    # Create the Analysis Request and submit the form
89
    ar = _createObjectByType("AnalysisRequest", client, tmpID())
90
    # mark the sample as temporary to avoid indexing
91
    api.mark_temporary(ar)
92
    # NOTE: We call here `_processForm` (with underscore) to manually unmark
93
    #       the creation flag and trigger the `ObjectInitializedEvent`, which
94
    #       is used for snapshot creation.
95
    ar._processForm(REQUEST=request, values=values)
96
97
    # Set the analyses manually
98
    ar.setAnalyses(service_uids, prices=prices, specs=results_ranges)
99
100
    # Explicitly set the specification to the sample
101
    if specification:
102
        ar.setSpecification(specification)
103
104
    # Handle hidden analyses from template and profiles
105
    # https://github.com/senaite/senaite.core/issues/1437
106
    # https://github.com/senaite/senaite.core/issues/1326
107
    apply_hidden_services(ar)
108
109
    # Handle rejection reasons
110
    rejection_reasons = resolve_rejection_reasons(values)
111
    ar.setRejectionReasons(rejection_reasons)
112
113
    # Handle secondary Analysis Request
114
    primary = ar.getPrimaryAnalysisRequest()
115
    if primary:
116
        # Mark the secondary with the `IAnalysisRequestSecondary` interface
117
        alsoProvides(ar, IAnalysisRequestSecondary)
118
119
        # Set dates to match with those from the primary
120
        ar.setDateSampled(primary.getDateSampled())
121
        ar.setSamplingDate(primary.getSamplingDate())
122
123
        # Force the transition of the secondary to received and set the
124
        # description/comment in the transition accordingly.
125
        date_received = primary.getDateReceived()
126
        if date_received:
127
            receive_sample(ar, date_received=date_received)
128
129
    parent_sample = ar.getParentAnalysisRequest()
130
    if parent_sample:
131
        # Always set partition to received
132
        date_received = parent_sample.getDateReceived()
133
        receive_sample(ar, date_received=date_received)
134
135
    if not IReceived.providedBy(ar):
136
        setup = api.get_setup()
137
        auto_receive = setup.getAutoreceiveSamples()
138
        if ar.getSamplingRequired():
139
            # sample has not been collected yet
140
            changeWorkflowState(ar, SAMPLE_WORKFLOW, "to_be_sampled",
141
                                action="to_be_sampled")
142
143
        elif auto_receive and can_receive(ar) and check_guard(ar, "receive"):
144
            # auto-receive the sample, but only if the user (that might be
145
            # a client) has enough privileges and the sample has a value set
146
            # for DateSampled. Otherwise, sample_due
147
            receive_sample(ar)
148
149
        else:
150
            # sample_due is the default initial status of the sample
151
            changeWorkflowState(ar, SAMPLE_WORKFLOW, "sample_due",
152
                                action="no_sampling_workflow")
153
154
    renameAfterCreation(ar)
155
    # AT only
156
    ar.unmarkCreationFlag()
157
    # unmark the sample as temporary
158
    api.unmark_temporary(ar)
159
    # explicit reindexing after sample finalization
160
    api.catalog_object(ar)
161
    # notify object initialization (also creates a snapshot)
162
    event.notify(ObjectInitializedEvent(ar))
163
164
    # If rejection reasons have been set, reject the sample automatically
165
    if rejection_reasons:
166
        do_rejection(ar)
167
168
    return ar
169
170
171
def receive_sample(sample, check_permission=False, date_received=None):
172
    """Receive the sample without transition
173
    """
174
175
    # NOTE: In `sample_registered` state we do not grant any roles the
176
    #       permission to receive a sample! Not sure if this can be ignored
177
    #       when the LIMS is configured to auto-receive samples?
178
    if check_permission and not can_receive(sample):
179
        return False
180
181
    changeWorkflowState(sample, SAMPLE_WORKFLOW, "sample_received",
182
                        action="receive")
183
184
    # Mark the secondary as received
185
    alsoProvides(sample, IReceived)
186
    # Manually set the received date
187
    if not date_received:
188
        date_received = DateTime()
189
    sample.setDateReceived(date_received)
190
191
    # Initialize analyses
192
    # NOTE: We use here `objectValues` instead of `getAnalyses`,
193
    #       because the Analyses are not yet indexed!
194
    for obj in sample.objectValues():
195
        if obj.portal_type != "Analysis":
196
            continue
197
        # https://github.com/senaite/senaite.core/pull/2650
198
        # NOTE: We trigger event handlers for analyses to keep it consistent
199
        # with the behavior when manually received.
200
        changeWorkflowState(obj, ANALYSIS_WORKFLOW, "unassigned",
201
                            action="initialize", trigger_events=True)
202
203
    return True
204
205
206
def apply_hidden_services(sample):
207
    """
208
    Applies the hidden setting to the sample analyses in accordance with the
209
    settings from its template and/or profiles
210
    :param sample: the sample that contains the analyses
211
    """
212
    hidden = list()
213
214
    # Get the "hidden" service uids from the template
215
    template = sample.getTemplate()
216
    hidden = get_hidden_service_uids(template)
217
218
    # Get the "hidden" service uids from profiles
219
    profiles = sample.getProfiles()
220
    hid_profiles = map(get_hidden_service_uids, profiles)
221
    hid_profiles = list(itertools.chain(*hid_profiles))
222
    hidden.extend(hid_profiles)
223
224
    # Update the sample analyses
225
    if api.is_temporary(sample):
226
        # sample is in create process. Just return the object values.
227
        analyses = sample.objectValues(spec="Analysis")
228
    else:
229
        analyses = sample.getAnalyses(full_objects=True)
230
    analyses = filter(lambda an: an.getServiceUID() in hidden, analyses)
231
    for analysis in analyses:
232
        analysis.setHidden(True)
233
234
235
def get_hidden_service_uids(profile_or_template):
236
    """Returns a list of service uids that are set as hidden
237
    :param profile_or_template: ARTemplate or AnalysisProfile object
238
    """
239
    if not profile_or_template:
240
        return []
241
    settings = profile_or_template.getAnalysisServicesSettings()
242
    hidden = filter(lambda ser: ser.get("hidden", False), settings)
243
    return map(lambda setting: setting["uid"], hidden)
244
245
246
def to_service_uids(services=None, values=None):
247
    """Returns a list of Analysis Services UIDS
248
249
    :param services: A list of service items (uid, keyword, brain, obj, title)
250
    :param values: a dict, where keys are AR|Sample schema field names.
251
    :returns: a list of Analyses Services UIDs
252
    """
253
    def to_list(value):
254
        if not value:
255
            return []
256
        if isinstance(value, six.string_types):
257
            return [value]
258
        if isinstance(value, (list, tuple)):
259
            return value
260
        logger.warn("Cannot convert to a list: {}".format(value))
261
        return []
262
263
    services = services or []
264
    values = values or {}
265
266
    # Merge analyses from analyses_serv and values into one list
267
    uids = to_list(services) + to_list(values.get("Analyses"))
268
269
    # Convert them to a list of service uids
270
    uids = filter(None, map(to_service_uid, uids))
271
272
    # Extract and append the service UIDs from the profiles
273
    for profile in to_list(values.get("Profiles", [])):
274
        profile = api.get_object(profile, None)
275
        if not profile:
276
            continue
277
        uids.extend(profile.getServiceUIDs())
278
279
    # Get the service uids without duplicates, but preserving the order
280
    return list(OrderedDict.fromkeys(uids).keys())
281
282
283
def to_service_uid(uid_brain_obj_str):
284
    """Resolves the passed in element to a valid uid. Returns None if the value
285
    cannot be resolved to a valid uid
286
    """
287
    if api.is_uid(uid_brain_obj_str) and uid_brain_obj_str != "0":
288
        return uid_brain_obj_str
289
290
    if api.is_object(uid_brain_obj_str):
291
        obj = api.get_object(uid_brain_obj_str)
292
293
        if IAnalysisService.providedBy(obj):
294
            return api.get_uid(obj)
295
296
        elif IRoutineAnalysis.providedBy(obj):
297
            return obj.getServiceUID()
298
299
        else:
300
            logger.error("Type not supported: {}".format(obj.portal_type))
301
            return None
302
303
    if isinstance(uid_brain_obj_str, six.string_types):
304
        # Maybe is a keyword?
305
        query = dict(portal_type="AnalysisService", getKeyword=uid_brain_obj_str)
306
        brains = api.search(query, SETUP_CATALOG)
307
        if len(brains) == 1:
308
            return api.get_uid(brains[0])
309
310
        # Or maybe a title
311
        query = dict(portal_type="AnalysisService", title=uid_brain_obj_str)
312
        brains = api.search(query, SETUP_CATALOG)
313
        if len(brains) == 1:
314
            return api.get_uid(brains[0])
315
316
    return None
317
318
319
def create_retest(ar):
320
    """Creates a retest (Analysis Request) from an invalidated Analysis Request
321
    :param ar: The invalidated Analysis Request
322
    :type ar: IAnalysisRequest
323
    :rtype: IAnalysisRequest
324
    """
325
    if not ar:
326
        raise ValueError("Source Analysis Request cannot be None")
327
328
    if not IAnalysisRequest.providedBy(ar):
329
        raise ValueError("Type not supported: {}".format(repr(type(ar))))
330
331
    if ar.getRetest():
332
        # Do not allow the creation of another retest!
333
        raise ValueError("Retest already set")
334
335
    if not ar.isInvalid():
336
        # Analysis Request must be in 'invalid' state
337
        raise ValueError("Cannot do a retest from an invalid Analysis Request")
338
339
    # Create the Retest (Analysis Request)
340
    ignore = ['Analyses', 'DatePublished', 'Invalidated', 'Sample', 'Remarks']
341
    retest = _createObjectByType("AnalysisRequest", ar.aq_parent, tmpID())
342
    copy_field_values(ar, retest, ignore_fieldnames=ignore)
343
344
    # Mark the retest with the `IAnalysisRequestRetest` interface
345
    alsoProvides(retest, IAnalysisRequestRetest)
346
347
    # Assign the source to retest
348
    retest.setInvalidated(ar)
349
350
    # Rename the retest according to the ID server setup
351
    renameAfterCreation(retest)
352
353
    # Copy the analyses from the source
354
    intermediate_states = ['retracted', ]
355
    for an in ar.getAnalyses(full_objects=True):
356
        # skip retests
357
        if an.isRetest():
358
            continue
359
360
        if api.get_workflow_status_of(an) in intermediate_states:
361
            # Exclude intermediate analyses
362
            continue
363
364
        # Original sample might have multiple copies of same analysis
365
        keyword = an.getKeyword()
366
        analyses = retest.getAnalyses(full_objects=True)
367
        analyses = filter(lambda ret: ret.getKeyword() == keyword, analyses)
0 ignored issues
show
introduced by
The variable keyword does not seem to be defined for all execution paths.
Loading history...
368
        if analyses:
369
            keyword = '{}-{}'.format(keyword, len(analyses))
370
371
        # Create the analysis retest
372
        nan = _createObjectByType("Analysis", retest, keyword)
373
374
        # Make a copy
375
        ignore_fieldnames = ['DataAnalysisPublished']
376
        copy_field_values(an, nan, ignore_fieldnames=ignore_fieldnames)
377
        nan.unmarkCreationFlag()
378
        nan.reindexObject()
379
380
    # Transition the retest to "sample_received"!
381
    changeWorkflowState(retest, SAMPLE_WORKFLOW, 'sample_received')
382
    alsoProvides(retest, IReceived)
383
384
    # Initialize analyses
385
    for analysis in retest.getAnalyses(full_objects=True):
386
        if not IRoutineAnalysis.providedBy(analysis):
387
            continue
388
        changeWorkflowState(analysis, ANALYSIS_WORKFLOW, "unassigned")
389
390
    # Reindex and other stuff
391
    retest.reindexObject()
392
    retest.aq_parent.reindexObject()
393
394
    return retest
395
396
397
def create_partition(analysis_request, request, analyses, sample_type=None,
398
                     container=None, preservation=None, skip_fields=None,
399
                     internal_use=True):
400
    """
401
    Creates a partition for the analysis_request (primary) passed in
402
    :param analysis_request: uid/brain/object of IAnalysisRequest type
403
    :param request: the current request object
404
    :param analyses: uids/brains/objects of IAnalysis type
405
    :param sampletype: uid/brain/object of SampleType
406
    :param container: uid/brain/object of Container
407
    :param preservation: uid/brain/object of SamplePreservation
408
    :param skip_fields: names of fields to be skipped on copy from primary
409
    :return: the new partition
410
    """
411
    partition_skip_fields = [
412
        "Analyses",
413
        "Attachment",
414
        "Client",
415
        "DetachedFrom",
416
        "Profile",
417
        "Profiles",
418
        "RejectionReasons",
419
        "Remarks",
420
        "ResultsInterpretation",
421
        "ResultsInterpretationDepts",
422
        "Sample",
423
        "Template",
424
        "creation_date",
425
        "modification_date",
426
        "ParentAnalysisRequest",
427
        "PrimaryAnalysisRequest",
428
        # default fields
429
        "id",
430
        "description",
431
        "allowDiscussion",
432
        "subject",
433
        "location",
434
        "contributors",
435
        "creators",
436
        "effectiveDate",
437
        "expirationDate",
438
        "language",
439
        "rights",
440
        "creation_date",
441
        "modification_date",
442
    ]
443
    if skip_fields:
444
        partition_skip_fields.extend(skip_fields)
445
        partition_skip_fields = list(set(partition_skip_fields))
446
447
    # Copy field values from the primary analysis request
448
    ar = api.get_object(analysis_request)
449
    record = fields_to_dict(ar, partition_skip_fields)
450
451
    # Update with values that are partition-specific
452
    record.update({
453
        "InternalUse": internal_use,
454
        "ParentAnalysisRequest": api.get_uid(ar),
455
    })
456
    if sample_type is not None:
457
        record["SampleType"] = sample_type and api.get_uid(sample_type) or ""
458
    if container is not None:
459
        record["Container"] = container and api.get_uid(container) or ""
460
    if preservation is not None:
461
        record["Preservation"] = preservation and api.get_uid(preservation) or ""
462
463
    # Create the Partition
464
    client = ar.getClient()
465
    analyses = list(set(map(api.get_object, analyses)))
466
    services = map(lambda an: an.getAnalysisService(), analyses)
467
468
    # Populate the root's ResultsRanges to partitions
469
    results_ranges = ar.getResultsRange() or []
470
471
    partition = create_analysisrequest(client,
472
                                       request=request,
473
                                       values=record,
474
                                       analyses=services,
475
                                       results_ranges=results_ranges)
476
477
    # Reindex Parent Analysis Request
478
    ar.reindexObject(idxs=["isRootAncestor"])
479
480
    return partition
481
482
483
def fields_to_dict(obj, skip_fields=None):
484
    """
485
    Generates a dictionary with the field values of the object passed in, where
486
    keys are the field names. Skips computed fields
487
    """
488
    data = {}
489
    obj = api.get_object(obj)
490
    for field_name, field in api.get_fields(obj).items():
491
        if skip_fields and field_name in skip_fields:
492
            continue
493
        if field.type == "computed":
494
            continue
495
        data[field_name] = field.get(obj)
496
    return data
497
498
499
def resolve_rejection_reasons(values):
500
    """Resolves the rejection reasons from the submitted values to the format
501
    supported by Sample's Rejection Reason field
502
    """
503
    rejection_reasons = values.get("RejectionReasons")
504
    if not rejection_reasons:
505
        return []
506
507
    # XXX RejectionReasons returns a list with a single dict
508
    reasons = rejection_reasons[0] or {}
509
    if reasons.get("checkbox") != "on":
510
        # reasons entry is toggled off
511
        return []
512
513
    # Predefined reasons selected?
514
    selected = reasons.get("multiselection") or []
515
516
    # Other reasons set?
517
    other = reasons.get("other") or ""
518
519
    # If neither selected nor other reasons are set, return empty
520
    if any([selected, other]):
521
        return [{"selected": selected, "other": other}]
522
523
    return []
524
525
526
def do_rejection(sample, notify=None):
527
    """Rejects the sample and if succeeds, generates the rejection pdf and
528
    sends a notification email. If notify is None, the notification email will
529
    only be sent if the setting in Setup is enabled
530
    """
531
    sample_id = api.get_id(sample)
532
    if not sample.getRejectionReasons():
533
        logger.warn("Cannot reject {} w/o rejection reasons".format(sample_id))
534
        return
535
536
    success, msg = doActionFor(sample, "reject")
537
    if not success:
538
        logger.warn("Cannot reject the sample {}".format(sample_id))
539
        return
540
541
    # Generate a pdf with the rejection reasons
542
    pdf = get_rejection_pdf(sample)
543
544
    # Attach the PDF to the sample
545
    filename = "{}-rejected.pdf".format(sample_id)
546
    attachment = sample.createAttachment(pdf, filename=filename)
547
    pdf_file = attachment.getAttachmentFile()
548
549
    # Do we need to send a notification email?
550
    if notify is None:
551
        setup = api.get_setup()
552
        notify = setup.getNotifyOnSampleRejection()
553
554
    if notify:
555
        # Compose and send the email
556
        mime_msg = get_rejection_mail(sample, pdf_file)
557
        if mime_msg:
558
            # Send the email
559
            send_email(mime_msg)
560
561
562
def get_rejection_pdf(sample):
563
    """Generates a pdf with sample rejection reasons
564
    """
565
    # Avoid circular dependencies
566
    from senaite.core.browser.samples.rejection.report import RejectionReport
567
568
    # Render the html's rejection document
569
    tpl = RejectionReport(sample, api.get_request())
570
    return tpl.to_pdf()
571
572
573
def get_rejection_email_recipients(sample):
574
    """Returns a list with the email addresses to send the rejection report
575
    """
576
    # extract the emails from contacts
577
    contacts = [sample.getContact()] + sample.getCCContact()
578
    contacts = filter(None, contacts)
579
    emails = map(lambda contact: contact.getEmailAddress(), contacts)
580
581
    # extend with the CC emails
582
    emails = list(emails) + sample.getCCEmails(as_list=True)
583
    emails = filter(is_valid_email_address, emails)
584
    return list(emails)
585
586
587
588
def get_rejection_mail(sample, rejection_pdf=None):
589
    """Generates an email to sample contacts with rejection reasons
590
    """
591
    # Get the reasons
592
    reasons = sample.getRejectionReasons()
593
    reasons = reasons and reasons[0] or {}
594
    reasons = reasons.get("selected", []) + [reasons.get("other")]
595
    reasons = filter(None, reasons)
596
    reasons = "<br/>- ".join(reasons)
597
598
    # Render the email body
599
    setup = api.get_setup()
600
    lab_address = setup.laboratory.getPrintAddress()
601
    email_body = Template(setup.getEmailBodySampleRejection())
602
    email_body = email_body.safe_substitute({
603
        "lab_address": "<br/>".join(lab_address),
604
        "reasons": reasons and "<br/>-{}".format(reasons) or "",
605
        "sample_id": api.get_id(sample),
606
        "sample_link": get_link(api.get_url(sample), api.get_id(sample))
607
    })
608
609
    def to_valid_email_address(contact):
610
        if not contact:
611
            return None
612
        address = contact.getEmailAddress()
613
        if not is_valid_email_address(address):
614
            return None
615
        return address
616
617
    # Get the recipients
618
    _to = get_rejection_email_recipients(sample)
619
    if not _to:
620
        # Cannot send an e-mail without recipient!
621
        logger.warn("No valid recipients for {}".format(api.get_id(sample)))
622
        return None
623
624
    lab = api.get_setup().laboratory
625
    attachments = rejection_pdf and [rejection_pdf] or []
626
627
    return compose_email(
628
        from_addr=lab.getEmailAddress(),
629
        to_addr=_to,
630
        subj=_("%s has been rejected") % api.get_id(sample),
631
        body=email_body,
632
        html=True,
633
        attachments=attachments)
634