Passed
Push — master ( d539f4...a81f8a )
by Ramon
05:14 queued 14s
created

WorkflowActionSaveAnalysesAdapter.is_hidden()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
from email.mime.multipart import MIMEMultipart
2
from email.mime.text import MIMEText
3
from string import Template
4
5
from DateTime import DateTime
6
from Products.CMFPlone.utils import safe_unicode
7
from bika.lims import api
8
from bika.lims import bikaMessageFactory as _
9
from bika.lims import logger
10
from bika.lims.browser.workflow import WorkflowActionGenericAdapter, \
11
    RequestContextAware
12
from bika.lims.content.analysisspec import ResultsRangeDict
13
from bika.lims.interfaces import IAnalysisRequest, IWorkflowActionUIDsAdapter
14
from bika.lims.utils import encode_header
15
from bika.lims.utils import t
16
from email.Utils import formataddr
17
from zope.component.interfaces import implements
18
19
20
class WorkflowActionCopyToNewAdapter(RequestContextAware):
21
    """Adapter in charge of Analysis Requests 'copy_to_new' action
22
    """
23
    implements(IWorkflowActionUIDsAdapter)
24
25
    def __call__(self, action, uids):
26
        url = "{}/ar_add?ar_count={}&copy_from={}".format(
27
            self.back_url, len(uids), ",".join(uids))
28
        return self.redirect(redirect_url=url)
29
30
31
class WorkflowActionPrintStickersAdapter(RequestContextAware):
32
    """Adapter in charge of Analysis Requests 'print_stickers' action
33
    """
34
    implements(IWorkflowActionUIDsAdapter)
35
36
    def __call__(self, action, uids):
37
        url = "{}/sticker?template={}&items={}".format(self.back_url,
38
            self.context.bika_setup.getAutoStickerTemplate(), ",".join(uids))
39
        return self.redirect(redirect_url=url)
40
41
42
class WorkflowActionCreatePartitionsAdapter(RequestContextAware):
43
    """Adapter in charge of Analysis Requests 'copy_to_new' action
44
    """
45
    implements(IWorkflowActionUIDsAdapter)
46
47
    def __call__(self, action, uids):
48
        url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids))
49
        return self.redirect(redirect_url=url)
50
51
52
class WorkflowActionPublishAdapter(RequestContextAware):
53
    """Adapter in charge of Analysis Requests 'publish'-like actions
54
    """
55
    implements(IWorkflowActionUIDsAdapter)
56
57
    def __call__(self, action, uids):
58
        purl = self.context.portal_url()
59
        uids = ",".join(uids)
60
        url = "{}/analysisrequests/publish?items={}".format(purl, uids)
61
        return self.redirect(redirect_url=url)
62
63
64
class WorkflowActionReceiveAdapter(WorkflowActionGenericAdapter):
65
    """Adapter in charge of Analysis Request receive action
66
    """
67
68
    def __call__(self, action, objects):
69
        transitioned = self.do_action(action, objects)
70
        if not transitioned:
71
            return self.redirect(message=_("No changes made"), level="warning")
72
73
        auto_partitions = filter(self.is_auto_partition_required, objects)
74
        if auto_partitions:
75
            # Redirect to the partitioning view
76
            uids = ",".join(map(api.get_uid, auto_partitions))
77
            url = "{}/partition_magic?uids={}".format(self.back_url, uids)
78
            return self.redirect(redirect_url=url)
79
80
        if self.is_auto_print_stickers_enabled():
81
            # Redirect to the auto-print stickers view
82
            uids = ",".join(map(api.get_uid, transitioned))
83
            sticker_template = self.context.bika_setup.getAutoStickerTemplate()
84
            url = "{}/sticker?autoprint=1&template={}&items={}".format(
85
                self.back_url, sticker_template, uids)
86
            return self.redirect(redirect_url=url)
87
88
        # Redirect the user to success page
89
        return self.success(transitioned)
90
91
    def is_auto_partition_required(self, brain_or_object):
92
        """Returns whether the passed in object needs to be partitioned
93
        """
94
        obj = api.get_object(brain_or_object)
95
        if not IAnalysisRequest.providedBy(obj):
96
            return False
97
        template = obj.getTemplate()
98
        return template and template.getAutoPartition()
99
100
    def is_auto_print_stickers_enabled(self):
101
        """Returns whether the auto print of stickers on reception is enabled
102
        """
103
        return "receive" in self.context.bika_setup.getAutoPrintStickers()
104
105
106
class WorkflowActionInvalidateAdapter(WorkflowActionGenericAdapter):
107
    """Adapter in charge of Analysis Request invalidate action
108
    """
109
110
    def __call__(self, action, objects):
111
        transitioned = self.do_action(action, objects)
112
        if not transitioned:
113
            return self.redirect(message=_("No changes made"), level="warning")
114
115
        # Need to notify client contacts?
116
        if not self.context.bika_setup.getNotifyOnARRetract():
117
            return self.success(transitioned)
118
119
        # Alert the client contacts who ordered the results, stating that a
120
        # possible mistake has been picked up and is under investigation.
121
        for sample in transitioned:
122
            self.notify_ar_retract(sample)
123
124
        # Redirect the user to success page
125
        return self.success(transitioned)
126
127
    def notify_ar_retract(self, sample):
128
        """Sends an email notification to sample's client contact if the sample
129
        passed in has a retest associated
130
        """
131
        retest = sample.getRetest()
132
        if not retest:
133
            logger.warn("No retest found for {}. And it should!"
134
                        .format(api.get_id(sample)))
135
            return
136
137
        # Email fields
138
        sample_id = api.get_id(sample)
139
        subject = t(_("Erroneous result publication from {}").format(sample_id))
140
        lab_address = api.get_bika_setup().laboratory.getPrintAddress()
141
        emails_lab = self.get_lab_managers_formatted_emails()
142
        emails_sample = self.get_sample_contacts_formatted_emails(sample)
143
        recipients = list(set(emails_lab + emails_sample))
144
145
        msg = MIMEMultipart("related")
146
        msg["Subject"] = subject
147
        msg["From"] = self.get_laboratory_formatted_email()
148
        msg["To"] = ", ".join(recipients)
149
        body = Template("Some errors have been detected in the results report "
150
                        "published from the Sample $sample_link. The Sample "
151
                        "$retest_link has been created automatically and the "
152
                        "previous has been invalidated.<br/>"
153
                        "The possible mistake has been picked up and is under "
154
                        "investigation.<br/><br/>"
155
                        "$lab_address").safe_substitute(
156
            dict(sample_link=self.get_html_link(sample),
157
                 retest_link=self.get_html_link(retest),
158
                 lab_address = "<br/>".join(lab_address)))
159
        msg_txt = MIMEText(safe_unicode(body).encode('utf-8'), _subtype='html')
160
        msg.preamble = 'This is a multi-part MIME message.'
161
        msg.attach(msg_txt)
162
163
        # Send the email
164
        try:
165
            host = api.get_tool("MailHost")
166
            host.send(msg.as_string(), immediate=True)
167
        except Exception as err_msg:
168
            message = _("Unable to send an email to alert lab "
169
                        "client contacts that the Sample has been "
170
                        "retracted: ${error}",
171
                        mapping={'error': safe_unicode(err_msg)})
172
            self.context.plone_utils.addPortalMessage(message, 'warning')
173
174
    def get_formatted_email(self, email_name):
175
        """Formats a email
176
        """
177
        return formataddr((encode_header(email_name[0]), email_name[1]))
178
179
    def get_laboratory_formatted_email(self):
180
        """Returns the laboratory email formatted
181
        """
182
        lab = api.get_bika_setup().laboratory
183
        return self.get_formatted_email((lab.getEmailAddress(), lab.getName()))
184
185
    def get_lab_managers_formatted_emails(self):
186
        """Returns a list with lab managers formatted emails
187
        """
188
        users = api.get_users_by_roles("LabManager")
189
        users = map(lambda user: (user.getProperty("email"),
190
                                  user.getProperty("fullname")), users)
191
        return map(self.get_formatted_email, users)
192
193
    def get_contact_formatted_email(self, contact):
194
        """Returns a string with the formatted email for the given contact
195
        """
196
        contact_name = contact.Title()
197
        contact_email = contact.getEmailAddress()
198
        return self.get_formatted_email((contact_email, contact_name))
199
200
    def get_sample_contacts_formatted_emails(self, sample):
201
        """Returns a list with the formatted emails from sample contacts
202
        """
203
        contacts = list(set([sample.getContact()] + sample.getCCContact()))
204
        return map(self.get_contact_formatted_email, contacts)
205
206
    def get_html_link(self, obj):
207
        """Returns an html formatted link for the given object
208
        """
209
        return "<a href='{}'>{}</a>".format(api.get_url(obj), api.get_id(obj))
210
211
212
class WorkflowActionPrintSampleAdapter(WorkflowActionGenericAdapter):
213
    """Adapter in charge of Analysis Request print_sample action
214
    """
215
216
    def __call__(self, action, objects):
217
        # Update printed times
218
        transitioned = filter(lambda obj: self.set_printed_time(obj), objects)
219
        if not transitioned:
220
            return self.redirect(message=_("No changes made"), level="warning")
221
222
        # Redirect the user to success page
223
        return self.success(transitioned)
224
225
    def set_printed_time(self, sample):
226
        """Updates the printed time of the last results report from the sample
227
        """
228
        if api.get_workflow_status_of(sample) != "published":
229
            return False
230
        reports = sample.objectValues("ARReport")
231
        reports = sorted(reports, key=lambda report: report.getDatePublished())
232
        last_report = reports[-1]
233
        if not last_report.getDatePrinted():
234
            last_report.setDatePrinted(DateTime())
235
            sample.reindexObject(idxs=["getPrinted"])
236
        return True
237
238
239
class WorkflowActionSampleAdapter(WorkflowActionGenericAdapter):
240
    """Adapter in charge of Analysis Request sample action
241
    """
242
243 View Code Duplication
    def __call__(self, action, objects):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
244
        # Assign the Sampler and DateSampled
245
        transitioned = filter(lambda obj: self.set_sampler_info(obj), objects)
246
        if not transitioned:
247
            return self.redirect(message=_("No changes made"), level="warning")
248
249
        # Trigger "sample" transition
250
        transitioned = self.do_action(action, transitioned)
251
        if not transitioned:
252
            return self.redirect(message=_("No changes made"), level="warning")
253
254
        # Redirect the user to success page
255
        return self.success(transitioned)
256
257
    def set_sampler_info(self, sample):
258
        """Updates the Sampler and the Sample Date with the values provided in
259
        the request. If neither Sampler nor SampleDate are present in the
260
        request, returns False
261
        """
262
        if sample.getSampler() and sample.getDateSampled():
263
            # Sampler and Date Sampled already set. This is correct
264
            return True
265
        sampler = self.get_form_value("Sampler", sample, sample.getSampler())
266
        sampled = self.get_form_value("getDateSampled", sample,
267
                                      sample.getDateSampled())
268
        if not all([sampler, sampled]):
269
            return False
270
        sample.setSampler(sampler)
271
        sample.setDateSampled(DateTime(sampled))
272
        return True
273
274
275
class WorkflowActionPreserveAdapter(WorkflowActionGenericAdapter):
276
    """Adapter in charge of Analysis Request preserve action
277
    """
278
279 View Code Duplication
    def __call__(self, action, objects):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
280
        # Assign the Preserver and DatePreserved
281
        transitioned = filter(lambda obj: self.set_preserver_info(obj), objects)
282
        if not transitioned:
283
            return self.redirect(message=_("No changes made"), level="warning")
284
285
        # Trigger "preserve" transition
286
        transitioned = self.do_action(action, transitioned)
287
        if not transitioned:
288
            return self.redirect(message=_("No changes made"), level="warning")
289
290
        # Redirect the user to success page
291
        return self.success(transitioned)
292
293
    def set_preserver_info(self, sample):
294
        """Updates the Preserver and the Date Preserved with the values provided
295
        in the request. If neither Preserver nor DatePreserved are present in
296
        the request, returns False
297
        """
298
        if sample.getPreserver() and sample.getDatePreserved():
299
            # Preserver and Date Preserved already set. This is correct
300
            return True
301
        preserver = self.get_form_value("Preserver", sample,
302
                                        sample.getPreserver())
303
        preserved = self.get_form_value("getDatePreserved",
304
                                        sample.getDatePreserved())
305
        if not all([preserver, preserved]):
306
            return False
307
        sample.setPreserver(preserver)
308
        sample.setDatePreserver(DateTime(preserved))
309
        return True
310
311
312
class WorkflowActionScheduleSamplingAdapter(WorkflowActionGenericAdapter):
313
    """Adapter in charge of Analysis request schedule sampling action
314
    """
315
316 View Code Duplication
    def __call__(self, action, objects):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
317
        # Assign the scheduled Sampler and Sampling Date
318
        transitioned = filter(lambda obj: self.set_sampling_info(obj), objects)
319
        if not transitioned:
320
            return self.redirect(message=_("No changes made"), level="warning")
321
322
        # Trigger "schedule_sampling" transition
323
        transitioned = self.do_action(action, transitioned)
324
        if not transitioned:
325
            return self.redirect(message=_("No changes made"), level="warning")
326
327
        # Redirect the user to success page
328
        return self.success(transitioned)
329
330
    def set_sampling_info(self, sample):
331
        """Updates the scheduled Sampling sampler and the Sampling Date with the
332
        values provided in the request. If neither Sampling sampler nor Sampling
333
        Date are present in the request, returns False
334
        """
335
        if sample.getScheduledSamplingSampler() and sample.getSamplingDate():
336
            return True
337
        sampler = self.get_form_value("getScheduledSamplingSampler", sample,
338
                                      sample.getScheduledSamplingSampler())
339
        sampled = self.get_form_value("getSamplingDate",
340
                                      sample.getSamplingDate())
341
        if not all([sampler, sampled]):
342
            return False
343
        sample.setScheduledSamplingSampler(sampler)
344
        sample.setSamplingDate(DateTime(sampled))
345
        return True
346
347
class WorkflowActionSaveAnalysesAdapter(WorkflowActionGenericAdapter):
348
    """Adapter in charge of "save analyses" action in Analysis Request.
349
    """
350
351
    def __call__(self, action, services):
352
        """The objects passed in are Analysis Services and the context is the
353
        Analysis Request
354
        """
355
        sample = self.context
356
        if not IAnalysisRequest.providedBy(sample):
357
            return self.redirect(message=_("No changes made"), level="warning")
358
359
        # Get form values
360
        form = self.request.form
361
        prices = form.get("Price", [None])[0]
362
        hidden = map(lambda o: {"uid": o, "hidden": self.is_hidden(o)}, services)
363
        specs = map(lambda service: self.get_specs(service), services)
364
365
        # Set new analyses to the sample
366
        uids = map(api.get_uid, services)
367
        sample.setAnalysisServicesSettings(hidden)
368
        sample.setAnalyses(uids, prices=prices, specs=specs)
369
370
        # Just in case new analyses have been added while the Sample was in a
371
        # "non-open" state (e.g. "to_be_verified")
372
        self.do_action("rollback_to_receive", [sample])
373
374
        # Reindex the analyses that have been added
375
        for analysis in sample.objectValues("Analysis"):
376
            analysis.reindexObject()
377
378
        # Redirect the user to success page
379
        self.success([sample])
380
381
    def is_hidden(self, service):
382
        """Returns whether the request Hidden param for the given obj is True
383
        """
384
        uid = api.get_uid(service)
385
        hidden_ans = self.request.form.get("Hidden", {})
386
        return hidden_ans.get(uid, "") == "on"
387
388
    def get_specs(self, service):
389
        """Returns the analysis specs available in the request for the given uid
390
        """
391
        uid = api.get_uid(service)
392
        keyword = service.getKeyword()
393
        specs = ResultsRangeDict(keyword=keyword, uid=uid).copy()
394
        for key in specs.keys():
395
            specs_value = self.request.form.get(key, [{}])[0].get(uid, None)
396
            specs[key] = specs_value or specs.get(key)
397
        return specs
398