Passed
Push — master ( c72550...021814 )
by Ramon
05:39
created

WorkflowActionRejectAdapter.__call__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
from email.mime.multipart import MIMEMultipart
2
from email.mime.text import MIMEText
3
from email.Utils import formataddr
4
from string import Template
5
6
from bika.lims import api
7
from bika.lims import bikaMessageFactory as _
8
from bika.lims import logger
9
from bika.lims.browser.workflow import RequestContextAware
10
from bika.lims.browser.workflow import WorkflowActionGenericAdapter
11
from bika.lims.content.analysisspec import ResultsRangeDict
12
from bika.lims.interfaces import IAnalysisRequest
13
from bika.lims.interfaces import IWorkflowActionUIDsAdapter
14
from bika.lims.utils import encode_header
15
from bika.lims.utils import t
16
from DateTime import DateTime
17
from Products.CMFPlone.utils import safe_unicode
18
from zope.component.interfaces import implements
19
20
21
class WorkflowActionCopyToNewAdapter(RequestContextAware):
22
    """Adapter in charge of Analysis Requests 'copy_to_new' action
23
    """
24
    implements(IWorkflowActionUIDsAdapter)
25
26
    def __call__(self, action, uids):
27
        url = "{}/ar_add?ar_count={}&copy_from={}".format(
28
            self.back_url, len(uids), ",".join(uids))
29
        return self.redirect(redirect_url=url)
30
31
32
class WorkflowActionPrintStickersAdapter(RequestContextAware):
33
    """Adapter in charge of Analysis Requests 'print_stickers' action
34
    """
35
    implements(IWorkflowActionUIDsAdapter)
36
37
    def __call__(self, action, uids):
38
        url = "{}/sticker?template={}&items={}".format(self.back_url,
39
            self.context.bika_setup.getAutoStickerTemplate(), ",".join(uids))
40
        return self.redirect(redirect_url=url)
41
42
43
class WorkflowActionCreatePartitionsAdapter(RequestContextAware):
44
    """Adapter in charge of Analysis Requests 'copy_to_new' action
45
    """
46
    implements(IWorkflowActionUIDsAdapter)
47
48
    def __call__(self, action, uids):
49
        url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids))
50
        return self.redirect(redirect_url=url)
51
52
53
class WorkflowActionPublishAdapter(RequestContextAware):
54
    """Adapter in charge of Analysis Requests 'publish'-like actions
55
    """
56
    implements(IWorkflowActionUIDsAdapter)
57
58
    def __call__(self, action, uids):
59
        purl = self.context.portal_url()
60
        uids = ",".join(uids)
61
        url = "{}/analysisrequests/publish?items={}".format(purl, uids)
62
        return self.redirect(redirect_url=url)
63
64
65
class WorkflowActionRejectAdapter(RequestContextAware):
66
    """Adapter in charge of Analysis Requests 'reject' action
67
    """
68
    implements(IWorkflowActionUIDsAdapter)
69
70
    def __call__(self, action, uids):
71
        url = "{}/reject_samples?uids={}".format(self.back_url, ",".join(uids))
72
        return self.redirect(redirect_url=url)
73
74
75
class WorkflowActionReceiveAdapter(WorkflowActionGenericAdapter):
76
    """Adapter in charge of Analysis Request receive action
77
    """
78
79
    def __call__(self, action, objects):
80
        transitioned = self.do_action(action, objects)
81
        if not transitioned:
82
            return self.redirect(message=_("No changes made"), level="warning")
83
84
        auto_partitions = filter(self.is_auto_partition_required, objects)
85
        if auto_partitions:
86
            # Redirect to the partitioning view
87
            uids = ",".join(map(api.get_uid, auto_partitions))
88
            url = "{}/partition_magic?uids={}".format(self.back_url, uids)
89
            return self.redirect(redirect_url=url)
90
91
        if self.is_auto_print_stickers_enabled():
92
            # Redirect to the auto-print stickers view
93
            uids = ",".join(map(api.get_uid, transitioned))
94
            sticker_template = self.context.bika_setup.getAutoStickerTemplate()
95
            url = "{}/sticker?autoprint=1&template={}&items={}".format(
96
                self.back_url, sticker_template, uids)
97
            return self.redirect(redirect_url=url)
98
99
        # Redirect the user to success page
100
        return self.success(transitioned)
101
102
    def is_auto_partition_required(self, brain_or_object):
103
        """Returns whether the passed in object needs to be partitioned
104
        """
105
        obj = api.get_object(brain_or_object)
106
        if not IAnalysisRequest.providedBy(obj):
107
            return False
108
        template = obj.getTemplate()
109
        return template and template.getAutoPartition()
110
111
    def is_auto_print_stickers_enabled(self):
112
        """Returns whether the auto print of stickers on reception is enabled
113
        """
114
        return "receive" in self.context.bika_setup.getAutoPrintStickers()
115
116
117
class WorkflowActionInvalidateAdapter(WorkflowActionGenericAdapter):
118
    """Adapter in charge of Analysis Request invalidate action
119
    """
120
121
    def __call__(self, action, objects):
122
        transitioned = self.do_action(action, objects)
123
        if not transitioned:
124
            return self.redirect(message=_("No changes made"), level="warning")
125
126
        # Need to notify client contacts?
127
        if not self.context.bika_setup.getNotifyOnSampleInvalidation():
128
            return self.success(transitioned)
129
130
        # Alert the client contacts who ordered the results, stating that a
131
        # possible mistake has been picked up and is under investigation.
132
        for sample in transitioned:
133
            self.notify_ar_retract(sample)
134
135
        # Redirect the user to success page
136
        return self.success(transitioned)
137
138
    def notify_ar_retract(self, sample):
139
        """Sends an email notification to sample's client contact if the sample
140
        passed in has a retest associated
141
        """
142
        retest = sample.getRetest()
143
        if not retest:
144
            logger.warn("No retest found for {}. And it should!"
145
                        .format(api.get_id(sample)))
146
            return
147
148
        # Email fields
149
        sample_id = api.get_id(sample)
150
        subject = t(_("Erroneous result publication from {}").format(sample_id))
151
        emails_lab = self.get_lab_managers_formatted_emails()
152
        emails_sample = self.get_sample_contacts_formatted_emails(sample)
153
        recipients = list(set(emails_lab + emails_sample))
154
155
        msg = MIMEMultipart("related")
156
        msg["Subject"] = subject
157
        msg["From"] = self.get_laboratory_formatted_email()
158
        msg["To"] = ", ".join(recipients)
159
        body = self.get_email_body(sample)
160
        msg_txt = MIMEText(safe_unicode(body).encode('utf-8'), _subtype='html')
161
        msg.preamble = 'This is a multi-part MIME message.'
162
        msg.attach(msg_txt)
163
164
        # Send the email
165
        try:
166
            host = api.get_tool("MailHost")
167
            host.send(msg.as_string(), immediate=True)
168
        except Exception as err_msg:
169
            message = _("Unable to send an email to alert lab "
170
                        "client contacts that the Sample has been "
171
                        "retracted: ${error}",
172
                        mapping={'error': safe_unicode(err_msg)})
173
            self.context.plone_utils.addPortalMessage(message, 'warning')
174
175
    def get_email_body(self, sample):
176
        """Returns the email body text
177
        """
178
        retest = sample.getRetest()
179
        lab_address = api.get_bika_setup().laboratory.getPrintAddress()
180
        setup = api.get_setup()
181
        body = Template(setup.getEmailBodySampleInvalidation())\
182
            .safe_substitute(
183
            dict(sample_link=self.get_html_link(sample),
184
                 retest_link=self.get_html_link(retest),
185
                 sample_id=api.get_id(sample),
186
                 retest_id=api.get_id(retest),
187
                 lab_address="<br/>".join(lab_address)))
188
        return body
189
190
    def get_formatted_email(self, email_name):
191
        """Formats a email
192
        """
193
        return formataddr((encode_header(email_name[0]), email_name[1]))
194
195
    def get_laboratory_formatted_email(self):
196
        """Returns the laboratory email formatted
197
        """
198
        lab = api.get_bika_setup().laboratory
199
        return self.get_formatted_email((lab.getName(), lab.getEmailAddress()))
200
201
    def get_lab_managers_formatted_emails(self):
202
        """Returns a list with lab managers formatted emails
203
        """
204
        users = api.get_users_by_roles("LabManager")
205
        users = map(lambda user: (user.getProperty("fullname"),
206
                                  user.getProperty("email")), users)
207
        return map(self.get_formatted_email, users)
208
209
    def get_contact_formatted_email(self, contact):
210
        """Returns a string with the formatted email for the given contact
211
        """
212
        contact_name = contact.Title()
213
        contact_email = contact.getEmailAddress()
214
        return self.get_formatted_email((contact_name, contact_email))
215
216
    def get_sample_contacts_formatted_emails(self, sample):
217
        """Returns a list with the formatted emails from sample contacts
218
        """
219
        contacts = list(set([sample.getContact()] + sample.getCCContact()))
220
        return map(self.get_contact_formatted_email, contacts)
221
222
    def get_html_link(self, obj):
223
        """Returns an html formatted link for the given object
224
        """
225
        return "<a href='{}'>{}</a>".format(api.get_url(obj), api.get_id(obj))
226
227
228
class WorkflowActionPrintSampleAdapter(WorkflowActionGenericAdapter):
229
    """Adapter in charge of Analysis Request print_sample action
230
    """
231
232
    def __call__(self, action, objects):
233
        # Update printed times
234
        transitioned = filter(lambda obj: self.set_printed_time(obj), objects)
235
        if not transitioned:
236
            return self.redirect(message=_("No changes made"), level="warning")
237
238
        # Redirect the user to success page
239
        return self.success(transitioned)
240
241
    def set_printed_time(self, sample):
242
        """Updates the printed time of the last results report from the sample
243
        """
244
        if api.get_workflow_status_of(sample) != "published":
245
            return False
246
        reports = sample.objectValues("ARReport")
247
        reports = sorted(reports, key=lambda report: report.getDatePublished())
248
        last_report = reports[-1]
249
        if not last_report.getDatePrinted():
250
            last_report.setDatePrinted(DateTime())
251
            sample.reindexObject(idxs=["getPrinted"])
252
        return True
253
254
255
class WorkflowActionSampleAdapter(WorkflowActionGenericAdapter):
256
    """Adapter in charge of Analysis Request sample action
257
    """
258
259 View Code Duplication
    def __call__(self, action, objects):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
        # Assign the Sampler and DateSampled
261
        transitioned = filter(lambda obj: self.set_sampler_info(obj), objects)
262
        if not transitioned:
263
            return self.redirect(message=_("No changes made"), level="warning")
264
265
        # Trigger "sample" transition
266
        transitioned = self.do_action(action, transitioned)
267
        if not transitioned:
268
            return self.redirect(message=_("No changes made"), level="warning")
269
270
        # Redirect the user to success page
271
        return self.success(transitioned)
272
273
    def set_sampler_info(self, sample):
274
        """Updates the Sampler and the Sample Date with the values provided in
275
        the request. If neither Sampler nor SampleDate are present in the
276
        request, returns False
277
        """
278
        if sample.getSampler() and sample.getDateSampled():
279
            # Sampler and Date Sampled already set. This is correct
280
            return True
281
        sampler = self.get_form_value("Sampler", sample, sample.getSampler())
282
        sampled = self.get_form_value("getDateSampled", sample,
283
                                      sample.getDateSampled())
284
        if not all([sampler, sampled]):
285
            return False
286
        sample.setSampler(sampler)
287
        sample.setDateSampled(DateTime(sampled))
288
        return True
289
290
291
class WorkflowActionPreserveAdapter(WorkflowActionGenericAdapter):
292
    """Adapter in charge of Analysis Request preserve action
293
    """
294
295 View Code Duplication
    def __call__(self, action, objects):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
296
        # Assign the Preserver and DatePreserved
297
        transitioned = filter(lambda obj: self.set_preserver_info(obj), objects)
298
        if not transitioned:
299
            return self.redirect(message=_("No changes made"), level="warning")
300
301
        # Trigger "preserve" transition
302
        transitioned = self.do_action(action, transitioned)
303
        if not transitioned:
304
            return self.redirect(message=_("No changes made"), level="warning")
305
306
        # Redirect the user to success page
307
        return self.success(transitioned)
308
309
    def set_preserver_info(self, sample):
310
        """Updates the Preserver and the Date Preserved with the values provided
311
        in the request. If neither Preserver nor DatePreserved are present in
312
        the request, returns False
313
        """
314
        if sample.getPreserver() and sample.getDatePreserved():
315
            # Preserver and Date Preserved already set. This is correct
316
            return True
317
        preserver = self.get_form_value("Preserver", sample,
318
                                        sample.getPreserver())
319
        preserved = self.get_form_value("getDatePreserved",
320
                                        sample.getDatePreserved())
321
        if not all([preserver, preserved]):
322
            return False
323
        sample.setPreserver(preserver)
324
        sample.setDatePreserver(DateTime(preserved))
325
        return True
326
327
328
class WorkflowActionScheduleSamplingAdapter(WorkflowActionGenericAdapter):
329
    """Adapter in charge of Analysis request schedule sampling action
330
    """
331
332 View Code Duplication
    def __call__(self, action, objects):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
333
        # Assign the scheduled Sampler and Sampling Date
334
        transitioned = filter(lambda obj: self.set_sampling_info(obj), objects)
335
        if not transitioned:
336
            return self.redirect(message=_("No changes made"), level="warning")
337
338
        # Trigger "schedule_sampling" transition
339
        transitioned = self.do_action(action, transitioned)
340
        if not transitioned:
341
            return self.redirect(message=_("No changes made"), level="warning")
342
343
        # Redirect the user to success page
344
        return self.success(transitioned)
345
346
    def set_sampling_info(self, sample):
347
        """Updates the scheduled Sampling sampler and the Sampling Date with the
348
        values provided in the request. If neither Sampling sampler nor Sampling
349
        Date are present in the request, returns False
350
        """
351
        if sample.getScheduledSamplingSampler() and sample.getSamplingDate():
352
            return True
353
        sampler = self.get_form_value("getScheduledSamplingSampler", sample,
354
                                      sample.getScheduledSamplingSampler())
355
        sampled = self.get_form_value("getSamplingDate",
356
                                      sample.getSamplingDate())
357
        if not all([sampler, sampled]):
358
            return False
359
        sample.setScheduledSamplingSampler(sampler)
360
        sample.setSamplingDate(DateTime(sampled))
361
        return True
362
363
364
class WorkflowActionSaveAnalysesAdapter(WorkflowActionGenericAdapter):
365
    """Adapter in charge of "save analyses" action in Analysis Request.
366
    """
367
368
    def __call__(self, action, objects):
369
        """The objects passed in are Analysis Services and the context is the
370
        Analysis Request
371
        """
372
        sample = self.context
373
        if not IAnalysisRequest.providedBy(sample):
374
            return self.redirect(message=_("No changes made"), level="warning")
375
376
        # NOTE: https://github.com/senaite/senaite.core/issues/1276
377
        #
378
        # Explicitly lookup the UIDs from the request, because the default
379
        # behavior of the method `get_uids` in `WorkflowActionGenericAdapter`
380
        # falls back to the UID of the current context if no UIDs were
381
        # submitted, which is in that case an `AnalysisRequest`.
382
        uids = self.get_uids_from_request()
383
        services = map(api.get_object, uids)
384
385
        # Get form values
386
        form = self.request.form
387
        prices = form.get("Price", [None])[0]
388
        hidden = map(lambda o: {
389
            "uid": api.get_uid(o), "hidden": self.is_hidden(o)
390
        }, services)
391
        specs = map(lambda service: self.get_specs(service), services)
392
393
        # Set new analyses to the sample
394
        sample.setAnalysisServicesSettings(hidden)
395
        sample.setAnalyses(uids, prices=prices, specs=specs, hidden=hidden)
396
397
        # Just in case new analyses have been added while the Sample was in a
398
        # "non-open" state (e.g. "to_be_verified")
399
        self.do_action("rollback_to_receive", [sample])
400
401
        # Reindex the analyses that have been added
402
        for analysis in sample.objectValues("Analysis"):
403
            analysis.reindexObject()
404
405
        # Redirect the user to success page
406
        self.success([sample])
407
408
    def is_hidden(self, service):
409
        """Returns whether the request Hidden param for the given obj is True
410
        """
411
        uid = api.get_uid(service)
412
        hidden_ans = self.request.form.get("Hidden", {})
413
        return hidden_ans.get(uid, "") == "on"
414
415
    def get_specs(self, service):
416
        """Returns the analysis specs available in the request for the given uid
417
        """
418
        uid = api.get_uid(service)
419
        keyword = service.getKeyword()
420
        specs = ResultsRangeDict(keyword=keyword, uid=uid).copy()
421
        for key in specs.keys():
422
            specs_value = self.request.form.get(key, [{}])[0].get(uid, None)
423
            specs[key] = specs_value or specs.get(key)
424
        return specs
425