Passed
Push — 2.x ( df5950...91c842 )
by Ramon
07:45
created

senaite.core.browser.samples.invalidate_samples   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 326
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 43
eloc 198
dl 0
loc 326
rs 8.96
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A InvalidateSamplesView.invalidate() 0 10 2
A InvalidateSamplesView.send_invalidation_email() 0 20 2
A InvalidateSamplesView.redirect() 0 8 3
A InvalidateSamplesView.__init__() 0 6 1
A InvalidateSamplesView.get_samples() 0 12 3
A InvalidateSamplesView.get_invalidation_email() 0 30 2
B InvalidateSamplesView.get_success_message() 0 41 5
A InvalidateSamplesView.add_status_message() 0 4 1
A InvalidateSamplesView.get_recipients() 0 12 1
A InvalidateSamplesView.get_invalidation_reason() 0 9 3
C InvalidateSamplesView.__call__() 0 53 9
A InvalidateSamplesView.is_reason_required() 0 7 1
B InvalidateSamplesView.get_email_address() 0 25 6
A InvalidateSamplesView.uids() 0 10 2
A InvalidateSamplesView.get_samples_data() 0 17 2

How to fix   Complexity   

Complexity

Complex classes like senaite.core.browser.samples.invalidate_samples often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
from collections import OrderedDict
22
from string import Template
23
24
from Products.CMFCore.WorkflowCore import WorkflowException
25
from Products.Five.browser import BrowserView
26
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
27
from Products.PlonePAS.plugins.ufactory import PloneUser
28
from Products.PlonePAS.tools.memberdata import MemberData
29
from bika.lims import _
30
from bika.lims import api
31
from bika.lims.api.mail import compose_email
32
from bika.lims.api.mail import is_valid_email_address
33
from bika.lims.interfaces import IContact
34
from bika.lims.utils import get_link_for
35
from senaite.core.api import dtime
36
from senaite.core.api import workflow as wapi
37
from senaite.core.catalog import SAMPLE_CATALOG
38
from senaite.core.i18n import translate as t
39
40
41
class InvalidateSamplesView(BrowserView):
42
    """View for the invalidation of samples
43
    """
44
    template = ViewPageTemplateFile("templates/invalidate_samples.pt")
45
46
    def __init__(self, context, request):
47
        super(InvalidateSamplesView, self).__init__(context, request)
48
        self.context = context
49
        self.request = request
50
        self.portal = api.get_portal()
51
        self.back_url = api.get_url(self.context)
52
53
    @property
54
    def uids(self):
55
        """Returns the uids passed through the request
56
        """
57
        uids = self.request.form.get("uids", "")
58
        if api.is_string(uids):
59
            uids = uids.split(",")
60
61
        # Remove duplicates while keeping the order
62
        return list(OrderedDict.fromkeys(uids))
63
64
    @property
65
    def is_reason_required(self):
66
        """Returns whether the introduction of a reason is required for the
67
        invalidation of a sample
68
        """
69
        setup = api.get_setup()
70
        return setup.getInvalidationReasonRequired()
71
72
    def __call__(self):
73
        form = self.request.form
74
75
        # Form submit toggle
76
        form_submitted = form.get("submitted", False)
77
        form_invalidate = form.get("button_invalidate", False)
78
        form_cancel = form.get("button_cancel", False)
79
80
        # Handle invalidation
81
        if form_submitted and form_invalidate:
82
83
            processed = OrderedDict()
84
            samples = form.get("samples", [])
85
            for sample in samples:
86
                uid = sample.get("uid", "")
87
                reason = sample.get("reason", "").strip()
88
                notify = sample.get("notify", "") == "on"
89
90
                # invalidate
91
                sample = api.get_object_by_uid(uid)
92
                if not self.invalidate(sample, comment=reason):
93
                    message = _(
94
                        "Cannot invalidate ${sample_id}: ${error}",
95
                        mapping={
96
                            "sample_id": api.get_id(sample),
97
                        })
98
                    self.add_status_message(message, level="warning")
99
                    continue
100
101
                # keep track of the transitioned samples and recipients
102
                processed[sample] = []
103
104
                # notify via email and keep track of notified samples
105
                if notify:
106
                    recipients = self.send_invalidation_email(sample)
107
                    processed[sample] = recipients
108
109
            if not processed:
110
                return self.redirect(message=_(
111
                    "No samples were invalidated. Please ensure samples are "
112
                    "selected and meet the criteria for invalidation."
113
                ), level="error")
114
115
            # get the success message
116
            message = self.get_success_message(processed)
117
            return self.redirect(message=message)
118
119
        # Handle cancel
120
        if form_submitted and form_cancel:
121
            return self.redirect(message=_(
122
                "The invalidation process has been successfully cancelled."
123
            ))
124
        return self.template()
125
126
    def get_success_message(self, processed):
127
        """Returns the success message for samples that have been processed
128
        """
129
        # get the sample objects
130
        samples = processed.keys()
131
        # get the ids
132
        sample_ids = list(map(api.get_id, samples))
133
        # get the list of samples that were successfully notified by email
134
        notified = [samp for samp in samples if processed.get(samp)]
135
        # we are only interested on ids
136
        notified = list(map(api.get_id, notified))
137
138
        if len(samples) == 1 and notified:
139
            return _(
140
                "Sample ${sample_id} was successfully invalidated, and a "
141
                "notification email has been sent to the following "
142
                "recipients: ${recipients}.",
143
                mapping={
144
                    "sample_id": sample_ids[0],
145
                    "recipients": processed.get(samples[0]),
146
                })
147
148
        if len(sample_ids) == 1:
149
            return _(
150
                "Sample ${sample_id} has been successfully invalidated.",
151
                mapping={"sample_id": sample_ids[0]}
152
            )
153
154
        if notified:
155
            return _(
156
                "Samples ${sample_ids} were successfully invalidated, with "
157
                "notification emails sent for the following: ${notified_ids}.",
158
                mapping={
159
                    "sample_ids": ", ".join(sample_ids),
160
                    "notified_ids": ", ".join(notified),
161
                }
162
            )
163
164
        return _(
165
            "Samples ${sample_ids} were successfully invalidated.",
166
            mapping={"sample_ids": ", ".join(sample_ids),}
167
        )
168
169
    def get_samples(self):
170
        """Returns a list of objects coming from the "uids" request parameter
171
        """
172
        # Remove samples that cannot be invalidated
173
        samples = []
174
        query = dict(portal_type="AnalysisRequest", UID=self.uids)
175
        for brain in api.search(query, SAMPLE_CATALOG):
176
            sample = api.get_object(brain)
177
            if wapi.is_transition_allowed(sample, "invalidate"):
178
                samples.append(sample)
179
180
        return samples
181
182
    def get_samples_data(self):
183
        """Returns a list of Samples data (dictionary)
184
        """
185
        for obj in self.get_samples():
186
            emails = self.get_recipients(obj)
187
            created = api.get_creation_date(obj)
188
            yield {
189
                "obj": obj,
190
                "id": api.get_id(obj),
191
                "uid": api.get_uid(obj),
192
                "title": api.get_title(obj),
193
                "path": api.get_path(obj),
194
                "url": api.get_url(obj),
195
                "sample_type": obj.getSampleTypeTitle(),
196
                "client_title": obj.getClientTitle(),
197
                "date": dtime.to_localized_time(created, long_format=True),
198
                "recipients": emails,
199
            }
200
201
    def get_recipients(self, sample):
202
        """Returns the list of email recipients for the given sample
203
        """
204
        managers = api.get_users_by_roles("LabManager")
205
        recipients = managers + [sample.getContact()] + sample.getCCContact()
206
        recipients = filter(None, map(self.get_email_address, recipients))
207
        recipients = list(OrderedDict.fromkeys(recipients))
208
209
        # extend with the CC emails
210
        recipients = recipients + sample.getCCEmails(as_list=True)
211
        recipients = filter(is_valid_email_address, recipients)
212
        return list(recipients)
213
214
    def get_email_address(self, contact_user_email):
215
        """Returns the email address for the contact, member or email
216
        """
217
        if is_valid_email_address(contact_user_email):
218
            return contact_user_email
219
220
        if IContact.providedBy(contact_user_email):
221
            contact_email = contact_user_email.getEmailAddress()
222
            return self.get_email_address(contact_email)
223
224
        if isinstance(contact_user_email, MemberData):
225
            contact_user_email = contact_user_email.getUser()
226
227
        if isinstance(contact_user_email, PloneUser):
228
            # Try with the contact's email first
229
            contact = api.get_user_contact(contact_user_email)
230
            contact_email = self.get_email_address(contact)
231
            if contact_email:
232
                return contact_email
233
234
            # Fallback to member's email
235
            user_email = contact_user_email.getProperty("email")
236
            return self.get_email_address(user_email)
237
238
        return None
239
240
    def invalidate(self, sample, comment=""):
241
        """Invalidates the sample and stores a comment in action as the reason
242
        of invalidation
243
        """
244
        wf = api.get_tool("portal_workflow")
245
        try:
246
            wf.doActionFor(sample, "invalidate", comment=comment)
247
            return True
248
        except WorkflowException:
249
            return False
250
251
    def send_invalidation_email(self, sample):
252
        """Sends an email about the invalidation to the contacts of the sample
253
        and if succeeds, returns back the email's "To" mime header. Returns
254
        None otherwise
255
        """
256
        try:
257
            email_message = self.get_invalidation_email(sample)
258
            host = api.get_tool("MailHost")
259
            host.send(email_message, immediate=True)
260
            return email_message["To"]
261
        except Exception as err_msg:
262
            message = _(
263
                "Cannot send email for ${sample_id}: ${error}",
264
                mapping={
265
                    "sample_id": api.get_id(sample),
266
                    "error": api.safe_unicode(err_msg)
267
                })
268
            self.add_status_message(message, level="warning")
269
270
        return None
271
272
    def get_invalidation_email(self, sample):
273
        """Returns the sample invalidation MIME Message for the sample
274
        """
275
        recipients = self.get_recipients(sample)
276
        if not recipients:
277
            sample_id = api.get_id(sample)
278
            raise ValueError("No valid recipients for {}".format(sample_id))
279
280
        # Compose the email
281
        subject = t(
282
            _("Erroneous result publication: ${sample_id}",
283
              mapping={"sample_id": api.get_id(sample)})
284
        )
285
286
        setup = api.get_setup()
287
        retest = sample.getRetest()
288
        lab_email = setup.laboratory.getEmailAddress()
289
        lab_address = setup.laboratory.getPrintAddress()
290
        body = Template(setup.getEmailBodySampleInvalidation())
291
        body = body.safe_substitute({
292
            "lab_address": "<br/>".join(lab_address),
293
            "sample_id": api.get_id(sample),
294
            "sample_link": get_link_for(sample, csrf=False),
295
            "retest_id": api.get_id(retest),
296
            "retest_link": get_link_for(retest, csrf=False),
297
            "reason": self.get_invalidation_reason(sample),
298
        })
299
300
        return compose_email(from_addr=lab_email, to_addr=recipients,
301
                             subj=subject, body=body, html=True)
302
303
    def get_invalidation_reason(self, sample):
304
        """Returns the reason of the invalidation, if any. Returns empty string
305
        otherwise
306
        """
307
        history = api.get_review_history(sample)
308
        for event in history:
309
            if event.get("action") == "invalidate":
310
                return event.get("comments", "")
311
        return ""
312
313
    def redirect(self, redirect_url=None, message=None, level="info"):
314
        """Redirect with a message
315
        """
316
        if redirect_url is None:
317
            redirect_url = self.back_url
318
        if message is not None:
319
            self.add_status_message(message, level)
320
        return self.request.response.redirect(redirect_url)
321
322
    def add_status_message(self, message, level="info"):
323
        """Set a portal status message
324
        """
325
        return self.context.plone_utils.addPortalMessage(message, level)
326