Passed
Push — master ( ddf457...c18102 )
by Ramon
05:18
created

AnalysisRequestWorkflowAction.get_specs()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
cc 2
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
from email.mime.multipart import MIMEMultipart
9
from email.mime.text import MIMEText
10
from string import Template
11
12
import plone
13
from DateTime import DateTime
14
from Products.CMFCore.utils import getToolByName
15
from Products.CMFPlone.utils import safe_unicode
16
from bika.lims import PMF, api
17
from bika.lims import bikaMessageFactory as _
18
from bika.lims import interfaces
19
from bika.lims.browser.analyses.workflow import AnalysesWorkflowAction
20
from bika.lims.browser.bika_listing import WorkflowAction
21
from bika.lims.content.analysisspec import ResultsRangeDict
22
from bika.lims.interfaces import IAnalysisRequest
23
from bika.lims.permissions import *
24
from bika.lims.utils import encode_header
25
from bika.lims.utils import isActive
26
from bika.lims.utils import t
27
from bika.lims.workflow import doActionFor
28
from email.Utils import formataddr
29
30
31
# TODO Revisit AnalysisRequestWorkflowAction class
32
# This class is not only used for workflow actions taken in AnalysisRequest
33
# context, but also for workflow actions taken in other contexts (e.g.Client or
34
# Batch) where the triggered action is for Analysis Requests selected from a
35
# listing. E.g: ClientWorkflowAction and BatchWorkflowAction.
36
class AnalysisRequestWorkflowAction(AnalysesWorkflowAction):
37
    """Workflow actions taken in AnalysisRequest context.
38
    """
39
40
    def __call__(self):
41
        form = self.request.form
42
        plone.protect.CheckAuthenticator(form)
43
        action, came_from = WorkflowAction._get_form_workflow_action(self)
44
        if type(action) in (list, tuple):
45
            action = action[0]
46
        if type(came_from) in (list, tuple):
47
            came_from = came_from[0]
48
        # Call out to the workflow action method
49
        # Use default bika_listing.py/WorkflowAction for other transitions
50
        method_name = 'workflow_action_' + action if action else ''
51
        method = getattr(self, method_name, False)
52
        if method:
53
            method()
54
        else:
55
            WorkflowAction.__call__(self)
56
57
    def notify_ar_retract(self, ar, newar):
58
        bika_setup = api.get_bika_setup()
59
        laboratory = bika_setup.laboratory
60
        lab_address = "<br/>".join(laboratory.getPrintAddress())
61
        mime_msg = MIMEMultipart('related')
62
        mime_msg['Subject'] = t(_("Erroneus result publication from ${request_id}",
63
                                mapping={"request_id": ar.getId()}))
64
        mime_msg['From'] = formataddr(
65
            (encode_header(laboratory.getName()),
66
             laboratory.getEmailAddress()))
67
        to = []
68
        contact = ar.getContact()
69
        if contact:
70
            to.append(formataddr((encode_header(contact.Title()),
71
                                  contact.getEmailAddress())))
72
        for cc in ar.getCCContact():
73
            formatted = formataddr((encode_header(cc.Title()),
74
                                   cc.getEmailAddress()))
75
            if formatted not in to:
76
                to.append(formatted)
77
78
        managers = self.context.portal_groups.getGroupMembers('LabManagers')
79
        for bcc in managers:
80
            user = self.portal.acl_users.getUser(bcc)
81
            if user:
82
                uemail = user.getProperty('email')
83
                ufull = user.getProperty('fullname')
84
                formatted = formataddr((encode_header(ufull), uemail))
85
                if formatted not in to:
86
                    to.append(formatted)
87
        mime_msg['To'] = ','.join(to)
88
        aranchor = "<a href='%s'>%s</a>" % (ar.absolute_url(),
89
                                            ar.getId())
90
        naranchor = "<a href='%s'>%s</a>" % (newar.absolute_url(),
91
                                             newar.getId())
92
        addremarks = ('addremarks' in self.request and ar.getRemarks()) and ("<br/><br/>" + _("Additional remarks:") +
93
                                                                             "<br/>" + ar.getRemarks().split("===")[1].strip() +
94
                                                                             "<br/><br/>") or ''
95
        sub_d = dict(request_link=aranchor,
96
                     new_request_link=naranchor,
97
                     remarks=addremarks,
98
                     lab_address=lab_address)
99
        body = Template("Some errors have been detected in the results report "
100
                        "published from the Sample $request_link. The Analysis "
101
                        "Request $new_request_link has been created automatically and the "
102
                        "previous has been invalidated.<br/>The possible mistake "
103
                        "has been picked up and is under investigation.<br/><br/>"
104
                        "$remarks $lab_address").safe_substitute(sub_d)
105
        msg_txt = MIMEText(safe_unicode(body).encode('utf-8'),
106
                           _subtype='html')
107
        mime_msg.preamble = 'This is a multi-part MIME message.'
108
        mime_msg.attach(msg_txt)
109
        try:
110
            host = getToolByName(self.context, 'MailHost')
111
            host.send(mime_msg.as_string(), immediate=True)
112
        except Exception as msg:
113
            message = _('Unable to send an email to alert lab '
114
                        'client contacts that the Sample has been '
115
                        'retracted: ${error}',
116
                        mapping={'error': safe_unicode(msg)})
117
            self.context.plone_utils.addPortalMessage(message, 'warning')
118
119
    def get_specs_value(self, service, specs_key, default=None):
120
        """Returns the specification value for the specs_key (min, max, etc.)
121
        that must be assigned to new analyses for the service passed in
122
        """
123
        uid = api.get_uid(service)
124
        spec_value = self.request.form.get(specs_key, [{}])[0].get(uid, None)
125
        if spec_value is None and default is not None:
126
            return default
127
        return spec_value
128
129
    def get_specs(self, service):
130
        """Returns the analysis specs to assign to analyses created by using
131
        the service passed in. It overrides the specs of the service with the
132
        specs set manually in the manage analyses form (if any).
133
        """
134
        keyword = service.getKeyword()
135
        uid = api.get_uid(service)
136
        specs_keys = ("min", "max", "warn_min", "warn_max", "min_operator",
137
                      "max_operator")
138
        specs = ResultsRangeDict(keyword=keyword, uid=uid).copy()
139
        for specs_key in specs_keys:
140
            default = specs.get(specs_key, "")
141
            specs[specs_key] = self.get_specs_value(service, specs_key, default)
142
        return specs
143
144
    def workflow_action_save_analyses_button(self):
145
        form = self.request.form
146
        # AR Manage Analyses: save Analyses
147
        ar = self.context
148
        objects = WorkflowAction._get_selected_items(self)
149
        objects_uids = objects.keys()
150
        prices = form.get("Price", [None])[0]
151
152
        # Hidden analyses?
153
        # https://jira.bikalabs.com/browse/LIMS-1324
154
        outs = []
155
        hidden_ans = form.get('Hidden', {})
156
        for uid in objects.keys():
157
            hidden = hidden_ans.get(uid, '') == "on" or False
158
            outs.append({'uid': uid, 'hidden': hidden})
159
        ar.setAnalysisServicesSettings(outs)
160
161
        specs = {}
162
        for service_uid, service in objects.items():
163
            specs[service_uid] = self.get_specs(service)
164
165
        if ar.setAnalyses(objects_uids, prices=prices, specs=specs.values()):
166
            doActionFor(ar, "rollback_to_receive")
167
168
        # Reindex the analyses
169
        for analysis in ar.objectValues("Analysis"):
170
            analysis.reindexObject()
171
172
        message = PMF("Changes saved.")
173
        self.context.plone_utils.addPortalMessage(message, 'info')
174
        self.destination_url = self.context.absolute_url()
175
        self.request.response.redirect(self.destination_url)
176
177
    def workflow_action_sample(self):
178
        # TODO Workflow - Analysis Request - this should be managed by the guard
179
        if IAnalysisRequest.providedBy(self.context):
180
            objects = [{api.get_uid(self.context): self.context}]
181
        else:
182
            objects = self._get_selected_items(filter_active=True)
183
        transitioned = []
184
        for uid, ar in objects.items():
185
            sampler = self.get_form_value("Sampler", uid, default="")
186
            sampled = self.get_form_value("getDateSampled", uid, default="")
187
            if not sampler or not sampled:
188
                continue
189
            ar.setSampler(sampler)
190
            ar.setDateSampled(DateTime(sampled))
191
            success, message = doActionFor(ar, "sample")
192
            if success:
193
                transitioned.append(ar.getId())
194
        message = _("No changes made")
195
        if transitioned:
196
            message = _("Saved items: {}".format(", ".join(transitioned)))
197
        self.redirect(message=message)
198
199
    def workflow_action_preserve(self):
200
        # TODO Workflow - Analysis Request - this should be managed by the guard
201
        if IAnalysisRequest.providedBy(self.context):
202
            objects = [{api.get_uid(self.context): self.context}]
203
        else:
204
            objects = self._get_selected_items(filter_active=True,
205
                                               permissions=[PreserveSample])
206
        transitioned = []
207
        for uid, ar in objects.items():
208
            preserver = self.get_form_value("Preserver", uid, default="")
209
            preserved = self.get_form_value("getDatePreserved", uid, default="")
210
            if not preserver or not preserved:
211
                continue
212
            ar.setPreserver(preserver)
213
            ar.setDatePreserved(DateTime(preserved))
214
            success, message = doActionFor(ar, "preserve")
215
            if success:
216
                transitioned.append(ar.getId())
217
        message = _("No changes made")
218
        if transitioned:
219
            message = _("Saved items: {}".format(", ".join(transitioned)))
220
        self.redirect(message=message)
221
222
    def requires_partitioning(self, brain_or_object):
223
        """Returns whether the passed in object needs to be partitioned
224
        """
225
        obj = api.get_object(brain_or_object)
226
        if not IAnalysisRequest.providedBy(obj):
227
            return False
228
        template = obj.getTemplate()
229
        return template and template.getAutoPartition()
230
231
    def workflow_action_receive(self):
232
        action, came_from = WorkflowAction._get_form_workflow_action(self)
233
        items = [self.context,] if came_from == 'workflow_action' \
234
                else self._get_selected_items().values()
235
        trans, dest = self.submitTransition(action, came_from, items)
236
        with_partitions = filter(self.requires_partitioning, items)
237
        if with_partitions:
238
            # Redirect to the partitioning magic view
239
            back_url = self.context.absolute_url()
240
            uids = ",".join(map(api.get_uid, with_partitions))
241
            url = "{}/partition_magic?uids={}".format(back_url, uids)
242
            self.request.response.redirect(url)
243
        elif trans and 'receive' in self.context.bika_setup.getAutoPrintStickers():
244
            transitioned = [item.UID() for item in items]
245
            tmpl = self.context.bika_setup.getAutoStickerTemplate()
246
            q = "/sticker?autoprint=1&template=%s&items=" % tmpl
247
            q += ",".join(transitioned)
248
            self.request.response.redirect(self.context.absolute_url() + q)
249
        elif trans:
250
            message = PMF('Changes saved.')
251
            self.context.plone_utils.addPortalMessage(message, 'info')
252
            self.destination_url = self.context.absolute_url()
253
            self.request.response.redirect(self.destination_url)
254
255
    def workflow_action_submit(self):
256
        AnalysesWorkflowAction.workflow_action_submit(self)
257
        checkPermission = self.context.portal_membership.checkPermission
258
        if checkPermission(EditResults, self.context):
259
            self.destination_url = self.context.absolute_url() + "/manage_results"
260
        else:
261
            self.destination_url = self.context.absolute_url()
262
        self.request.response.redirect(self.destination_url)
263
264
    def workflow_action_prepublish(self):
265
        self.workflow_action_publish()
266
267
    def workflow_action_republish(self):
268
        self.workflow_action_publish()
269
270
    def workflow_action_print(self):
271
        # Calls printLastReport method for selected ARs
272
        uids = self.request.get('uids',[])
273
        uc = getToolByName(self.context, 'uid_catalog')
274
        for obj in uc(UID=uids):
275
            ar=obj.getObject()
276
            ar.printLastReport()
277
        referer = self.request.get_header("referer")
278
        self.request.response.redirect(referer)
279
280
    def workflow_action_publish(self):
281
        if not isActive(self.context):
282
            message = _('Item is inactive.')
283
            self.context.plone_utils.addPortalMessage(message, 'info')
284
            self.request.response.redirect(self.context.absolute_url())
285
            return
286
        # AR publish preview
287
        uids = self.request.form.get('uids', [self.context.UID()])
288
        items = ",".join(uids)
289
        self.request.response.redirect(
290
            self.context.portal_url() + "/analysisrequests/publish?items="
291
            + items)
292
293
    def workflow_action_verify(self):
294
        # default bika_listing.py/WorkflowAction, but then go to view screen.
295
        self.destination_url = self.context.absolute_url()
296
        action, came_from = WorkflowAction._get_form_workflow_action(self)
297
        if type(came_from) in (list, tuple):
298
            came_from = came_from[0]
299
        return self.workflow_action_default(action='verify', came_from=came_from)
300
301
    def workflow_action_invalidate(self):
302
        # AR should be retracted
303
        # Can't transition inactive ARs
304
        if not api.is_active(self.context):
305
            message = _('Item is inactive.')
306
            self.context.plone_utils.addPortalMessage(message, 'info')
307
            self.request.response.redirect(self.context.absolute_url())
308
            return
309
310
        # Retract the AR and get the retest
311
        api.do_transition_for(self.context, 'invalidate')
312
        retest = self.context.getRetest()
313
314
        # 4. The system immediately alerts the client contacts who ordered
315
        # the results, per email and SMS, that a possible mistake has been
316
        # picked up and is under investigation.
317
        # A much possible information is provided in the email, linking
318
        # to the AR online.
319
        bika_setup = api.get_bika_setup()
320
        if bika_setup.getNotifyOnARRetract():
321
            self.notify_ar_retract(self.context, retest)
322
323
        message = _('${items} invalidated.',
324
                    mapping={'items': self.context.getId()})
325
        self.context.plone_utils.addPortalMessage(message, 'warning')
326
        self.request.response.redirect(retest.absolute_url())
327
328
    def workflow_action_copy_to_new(self):
329
        # Pass the selected AR UIDs in the request, to ar_add.
330
        objects = WorkflowAction._get_selected_items(self)
331
        if not objects:
332
            message = _("No analyses have been selected")
333
            self.context.plone_utils.addPortalMessage(message, 'info')
334
            referer = self.request.get_header("referer")
335
            self.request.response.redirect(referer)
336
            return
337
        url = self.context.absolute_url() + "/ar_add" + \
338
            "?ar_count={0}".format(len(objects)) + \
339
            "&copy_from={0}".format(",".join(objects.keys()))
340
        self.request.response.redirect(url)
341
        return
342
343
    def workflow_action_schedule_sampling(self):
344
        """
345
        This function prevent the transition if the fields "SamplingDate"
346
        and "ScheduledSamplingSampler" are uncompleted.
347
        :returns: bool
348
        """
349
        from bika.lims.utils.workflow import schedulesampling
350
        message = 'Not expected transition.'
351
        # In Samples Folder we have to get each selected item
352
        if interfaces.ISamplesFolder.providedBy(self.context):
353
            select_objs = WorkflowAction._get_selected_items(self)
354
            message = _('Transition done.')
355
            for key in select_objs.keys():
356
                sample = select_objs[key]
357
                # Getting the sampler
358
                sch_sampl = self.request.form.get(
359
                    'getScheduledSamplingSampler', None)[0].get(key) if\
360
                    self.request.form.get(
361
                        'getScheduledSamplingSampler', None) else ''
362
                # Getting the date
363
                sampl_date = self.request.form.get(
364
                    'getSamplingDate', None)[0].get(key) if\
365
                    self.request.form.get(
366
                        'getSamplingDate', None) else ''
367
                # Setting both values
368
                sample.setScheduledSamplingSampler(sch_sampl)
369
                sample.setSamplingDate(sampl_date)
370
                # Transitioning the sample
371
                success, errmsg = schedulesampling.doTransition(sample)
372
                if errmsg == 'missing':
373
                    message = _(
374
                        "'Sampling date' and 'Define the Sampler for the" +
375
                        " scheduled sampling' must be completed and saved " +
376
                        "in order to schedule a sampling. Element: %s" %
377
                        sample.getId())
378
                elif errmsg == 'cant_trans':
379
                    message = _(
380
                        "The item %s can't be transitioned." % sample.getId())
381
                else:
382
                    message = _('Transition done.')
383
                self.context.plone_utils.addPortalMessage(message, 'info')
384
        else:
385
            success, errmsg = schedulesampling.doTransition(self.context)
386
            if errmsg == 'missing':
387
                message = _(
388
                    "'Sampling date' and 'Define the Sampler for the" +
389
                    " scheduled sampling' must be completed and saved in " +
390
                    "order to schedule a sampling.")
391
            elif errmsg == 'cant_trans':
392
                message = _("The item can't be transitioned.")
393
            else:
394
                message = _('Transition done.')
395
            self.context.plone_utils.addPortalMessage(message, 'info')
396
        # Reload the page in order to show the portal message
397
        self.request.response.redirect(self.context.absolute_url())
398
        return success
0 ignored issues
show
introduced by
The variable success does not seem to be defined in case the for loop on line 355 is not entered. Are you sure this can never be the case?
Loading history...
399
400
    def workflow_action_create_partitions(self):
401
        """Redirects the user to the partition magic view
402
        """
403
        uids = list()
404
        if IAnalysisRequest.providedBy(self.context):
405
            uids = [api.get_uid(self.context)]
406
        else:
407
            uids = self.get_selected_uids()
408
        if not uids:
409
            message = "No items selected".format(repr(type(self.context)))
410
            self.redirect(message=message, level="error")
411
412
        # Redirect to the partitioning magic view
413
        url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids))
414
        self.redirect(redirect_url=url)
415