Passed
Push — master ( c474e7...9dc2fd )
by Ramon
08:26 queued 04:12
created

bika.lims.browser.analysisrequest.workflow   F

Complexity

Total Complexity 67

Size/Duplication

Total Lines 398
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 67
eloc 303
dl 0
loc 398
rs 3.04
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A AnalysisRequestWorkflowAction.workflow_action_print() 0 9 2
A AnalysisRequestWorkflowAction.workflow_action_republish() 0 2 1
A AnalysisRequestWorkflowAction.workflow_action_prepublish() 0 2 1
A AnalysisRequestWorkflowAction.workflow_action_submit() 0 8 2
A AnalysisRequestWorkflowAction.__call__() 0 16 5
C AnalysisRequestWorkflowAction.notify_ar_retract() 0 61 8
B AnalysisRequestWorkflowAction.workflow_action_sample() 0 21 7
B AnalysisRequestWorkflowAction.workflow_action_save_analyses_button() 0 40 5
B AnalysisRequestWorkflowAction.workflow_action_preserve() 0 22 7
A AnalysisRequestWorkflowAction.workflow_action_publish() 0 12 2
A AnalysisRequestWorkflowAction.workflow_action_create_partitions() 0 15 3
C AnalysisRequestWorkflowAction.workflow_action_schedule_sampling() 0 56 9
A AnalysisRequestWorkflowAction.workflow_action_verify() 0 7 2
A AnalysisRequestWorkflowAction.workflow_action_copy_to_new() 0 14 2
A AnalysisRequestWorkflowAction.workflow_action_invalidate() 0 26 3
A AnalysisRequestWorkflowAction.requires_partitioning() 0 8 2
B AnalysisRequestWorkflowAction.workflow_action_receive() 0 23 6

How to fix   Complexity   

Complexity

Complex classes like bika.lims.browser.analysisrequest.workflow often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
from email.mime.multipart import MIMEMultipart
9
from email.mime.text import MIMEText
10
from string import Template
11
12
import plone
13
from DateTime import DateTime
14
from Products.CMFCore.utils import getToolByName
15
from Products.CMFPlone.utils import safe_unicode
16
from bika.lims import PMF, api
17
from bika.lims import bikaMessageFactory as _
18
from bika.lims import interfaces
19
from bika.lims.browser.analyses.workflow import AnalysesWorkflowAction
20
from bika.lims.browser.bika_listing import WorkflowAction
21
from bika.lims.content.analysisspec import ResultsRangeDict
22
from bika.lims.interfaces import IAnalysisRequest
23
from bika.lims.permissions import *
24
from bika.lims.utils import encode_header
25
from bika.lims.utils import isActive
26
from bika.lims.utils import t
27
from bika.lims.workflow import doActionFor
28
from email.Utils import formataddr
29
30
31
# TODO Revisit AnalysisRequestWorkflowAction class
32
# This class is not only used for workflow actions taken in AnalysisRequest
33
# context, but also for workflow actions taken in other contexts (e.g.Client or
34
# Batch) where the triggered action is for Analysis Requests selected from a
35
# listing. E.g: ClientWorkflowAction and BatchWorkflowAction.
36
class AnalysisRequestWorkflowAction(AnalysesWorkflowAction):
37
    """Workflow actions taken in AnalysisRequest context.
38
    """
39
40
    def __call__(self):
41
        form = self.request.form
42
        plone.protect.CheckAuthenticator(form)
43
        action, came_from = WorkflowAction._get_form_workflow_action(self)
44
        if type(action) in (list, tuple):
45
            action = action[0]
46
        if type(came_from) in (list, tuple):
47
            came_from = came_from[0]
48
        # Call out to the workflow action method
49
        # Use default bika_listing.py/WorkflowAction for other transitions
50
        method_name = 'workflow_action_' + action if action else ''
51
        method = getattr(self, method_name, False)
52
        if method:
53
            method()
54
        else:
55
            WorkflowAction.__call__(self)
56
57
    def notify_ar_retract(self, ar, newar):
58
        bika_setup = api.get_bika_setup()
59
        laboratory = bika_setup.laboratory
60
        lab_address = "<br/>".join(laboratory.getPrintAddress())
61
        mime_msg = MIMEMultipart('related')
62
        mime_msg['Subject'] = t(_("Erroneus result publication from ${request_id}",
63
                                mapping={"request_id": ar.getId()}))
64
        mime_msg['From'] = formataddr(
65
            (encode_header(laboratory.getName()),
66
             laboratory.getEmailAddress()))
67
        to = []
68
        contact = ar.getContact()
69
        if contact:
70
            to.append(formataddr((encode_header(contact.Title()),
71
                                  contact.getEmailAddress())))
72
        for cc in ar.getCCContact():
73
            formatted = formataddr((encode_header(cc.Title()),
74
                                   cc.getEmailAddress()))
75
            if formatted not in to:
76
                to.append(formatted)
77
78
        managers = self.context.portal_groups.getGroupMembers('LabManagers')
79
        for bcc in managers:
80
            user = self.portal.acl_users.getUser(bcc)
81
            if user:
82
                uemail = user.getProperty('email')
83
                ufull = user.getProperty('fullname')
84
                formatted = formataddr((encode_header(ufull), uemail))
85
                if formatted not in to:
86
                    to.append(formatted)
87
        mime_msg['To'] = ','.join(to)
88
        aranchor = "<a href='%s'>%s</a>" % (ar.absolute_url(),
89
                                            ar.getId())
90
        naranchor = "<a href='%s'>%s</a>" % (newar.absolute_url(),
91
                                             newar.getId())
92
        addremarks = ('addremarks' in self.request and ar.getRemarks()) and ("<br/><br/>" + _("Additional remarks:") +
93
                                                                             "<br/>" + ar.getRemarks().split("===")[1].strip() +
94
                                                                             "<br/><br/>") or ''
95
        sub_d = dict(request_link=aranchor,
96
                     new_request_link=naranchor,
97
                     remarks=addremarks,
98
                     lab_address=lab_address)
99
        body = Template("Some errors have been detected in the results report "
100
                        "published from the Sample $request_link. The Analysis "
101
                        "Request $new_request_link has been created automatically and the "
102
                        "previous has been invalidated.<br/>The possible mistake "
103
                        "has been picked up and is under investigation.<br/><br/>"
104
                        "$remarks $lab_address").safe_substitute(sub_d)
105
        msg_txt = MIMEText(safe_unicode(body).encode('utf-8'),
106
                           _subtype='html')
107
        mime_msg.preamble = 'This is a multi-part MIME message.'
108
        mime_msg.attach(msg_txt)
109
        try:
110
            host = getToolByName(self.context, 'MailHost')
111
            host.send(mime_msg.as_string(), immediate=True)
112
        except Exception as msg:
113
            message = _('Unable to send an email to alert lab '
114
                        'client contacts that the Sample has been '
115
                        'retracted: ${error}',
116
                        mapping={'error': safe_unicode(msg)})
117
            self.context.plone_utils.addPortalMessage(message, 'warning')
118
119
    def workflow_action_save_analyses_button(self):
120
        form = self.request.form
121
        # AR Manage Analyses: save Analyses
122
        ar = self.context
123
        objects = WorkflowAction._get_selected_items(self)
124
        objects_uids = objects.keys()
125
        prices = form.get("Price", [None])[0]
126
127
        # Hidden analyses?
128
        # https://jira.bikalabs.com/browse/LIMS-1324
129
        outs = []
130
        hidden_ans = form.get('Hidden', {})
131
        for uid in objects.keys():
132
            hidden = hidden_ans.get(uid, '') == "on" or False
133
            outs.append({'uid': uid, 'hidden': hidden})
134
        ar.setAnalysisServicesSettings(outs)
135
136
        specs = {}
137
        for service_uid, service in objects.items():
138
            keyword = service.getKeyword()
139
            results_range = ResultsRangeDict(keyword=keyword, uid=service_uid)
140
            results_range.update({
141
                "min": form["min"][0][service_uid],
142
                "max": form["max"][0][service_uid],
143
                "warn_min": form["warn_min"][0][service_uid],
144
                "warn_max": form["warn_max"][0][service_uid],
145
            })
146
            specs[service_uid] = results_range
147
148
        if ar.setAnalyses(objects_uids, prices=prices, specs=specs.values()):
149
            doActionFor(ar, "rollback_to_receive")
150
151
        # Reindex the analyses
152
        for analysis in ar.objectValues("Analysis"):
153
            analysis.reindexObject()
154
155
        message = PMF("Changes saved.")
156
        self.context.plone_utils.addPortalMessage(message, 'info')
157
        self.destination_url = self.context.absolute_url()
158
        self.request.response.redirect(self.destination_url)
159
160
    def workflow_action_sample(self):
161
        # TODO Workflow - Analysis Request - this should be managed by the guard
162
        if IAnalysisRequest.providedBy(self.context):
163
            objects = [{api.get_uid(self.context): self.context}]
164
        else:
165
            objects = self._get_selected_items(filter_active=True)
166
        transitioned = []
167
        for uid, ar in objects.items():
168
            sampler = self.get_form_value("Sampler", uid, default="")
169
            sampled = self.get_form_value("getDateSampled", uid, default="")
170
            if not sampler or not sampled:
171
                continue
172
            ar.setSampler(sampler)
173
            ar.setDateSampled(DateTime(sampled))
174
            success, message = doActionFor(ar, "sample")
175
            if success:
176
                transitioned.append(ar.getId())
177
        message = _("No changes made")
178
        if transitioned:
179
            message = _("Saved items: {}".format(", ".join(transitioned)))
180
        self.redirect(message=message)
181
182
    def workflow_action_preserve(self):
183
        # TODO Workflow - Analysis Request - this should be managed by the guard
184
        if IAnalysisRequest.providedBy(self.context):
185
            objects = [{api.get_uid(self.context): self.context}]
186
        else:
187
            objects = self._get_selected_items(filter_active=True,
188
                                               permissions=[PreserveSample])
189
        transitioned = []
190
        for uid, ar in objects.items():
191
            preserver = self.get_form_value("Preserver", uid, default="")
192
            preserved = self.get_form_value("getDatePreserved", uid, default="")
193
            if not preserver or not preserved:
194
                continue
195
            ar.setPreserver(preserver)
196
            ar.setDatePreserved(DateTime(preserved))
197
            success, message = doActionFor(ar, "preserve")
198
            if success:
199
                transitioned.append(ar.getId())
200
        message = _("No changes made")
201
        if transitioned:
202
            message = _("Saved items: {}".format(", ".join(transitioned)))
203
        self.redirect(message=message)
204
205
    def requires_partitioning(self, brain_or_object):
206
        """Returns whether the passed in object needs to be partitioned
207
        """
208
        obj = api.get_object(brain_or_object)
209
        if not IAnalysisRequest.providedBy(obj):
210
            return False
211
        template = obj.getTemplate()
212
        return template and template.getAutoPartition()
213
214
    def workflow_action_receive(self):
215
        action, came_from = WorkflowAction._get_form_workflow_action(self)
216
        items = [self.context,] if came_from == 'workflow_action' \
217
                else self._get_selected_items().values()
218
        trans, dest = self.submitTransition(action, came_from, items)
219
        with_partitions = filter(self.requires_partitioning, items)
220
        if with_partitions:
221
            # Redirect to the partitioning magic view
222
            back_url = self.context.absolute_url()
223
            uids = ",".join(map(api.get_uid, with_partitions))
224
            url = "{}/partition_magic?uids={}".format(back_url, uids)
225
            self.request.response.redirect(url)
226
        elif trans and 'receive' in self.context.bika_setup.getAutoPrintStickers():
227
            transitioned = [item.UID() for item in items]
228
            tmpl = self.context.bika_setup.getAutoStickerTemplate()
229
            q = "/sticker?autoprint=1&template=%s&items=" % tmpl
230
            q += ",".join(transitioned)
231
            self.request.response.redirect(self.context.absolute_url() + q)
232
        elif trans:
233
            message = PMF('Changes saved.')
234
            self.context.plone_utils.addPortalMessage(message, 'info')
235
            self.destination_url = self.context.absolute_url()
236
            self.request.response.redirect(self.destination_url)
237
238
    def workflow_action_submit(self):
239
        AnalysesWorkflowAction.workflow_action_submit(self)
240
        checkPermission = self.context.portal_membership.checkPermission
241
        if checkPermission(EditResults, self.context):
242
            self.destination_url = self.context.absolute_url() + "/manage_results"
243
        else:
244
            self.destination_url = self.context.absolute_url()
245
        self.request.response.redirect(self.destination_url)
246
247
    def workflow_action_prepublish(self):
248
        self.workflow_action_publish()
249
250
    def workflow_action_republish(self):
251
        self.workflow_action_publish()
252
253
    def workflow_action_print(self):
254
        # Calls printLastReport method for selected ARs
255
        uids = self.request.get('uids',[])
256
        uc = getToolByName(self.context, 'uid_catalog')
257
        for obj in uc(UID=uids):
258
            ar=obj.getObject()
259
            ar.printLastReport()
260
        referer = self.request.get_header("referer")
261
        self.request.response.redirect(referer)
262
263
    def workflow_action_publish(self):
264
        if not isActive(self.context):
265
            message = _('Item is inactive.')
266
            self.context.plone_utils.addPortalMessage(message, 'info')
267
            self.request.response.redirect(self.context.absolute_url())
268
            return
269
        # AR publish preview
270
        uids = self.request.form.get('uids', [self.context.UID()])
271
        items = ",".join(uids)
272
        self.request.response.redirect(
273
            self.context.portal_url() + "/analysisrequests/publish?items="
274
            + items)
275
276
    def workflow_action_verify(self):
277
        # default bika_listing.py/WorkflowAction, but then go to view screen.
278
        self.destination_url = self.context.absolute_url()
279
        action, came_from = WorkflowAction._get_form_workflow_action(self)
280
        if type(came_from) in (list, tuple):
281
            came_from = came_from[0]
282
        return self.workflow_action_default(action='verify', came_from=came_from)
283
284
    def workflow_action_invalidate(self):
285
        # AR should be retracted
286
        # Can't transition inactive ARs
287
        if not api.is_active(self.context):
288
            message = _('Item is inactive.')
289
            self.context.plone_utils.addPortalMessage(message, 'info')
290
            self.request.response.redirect(self.context.absolute_url())
291
            return
292
293
        # Retract the AR and get the retest
294
        api.do_transition_for(self.context, 'invalidate')
295
        retest = self.context.getRetest()
296
297
        # 4. The system immediately alerts the client contacts who ordered
298
        # the results, per email and SMS, that a possible mistake has been
299
        # picked up and is under investigation.
300
        # A much possible information is provided in the email, linking
301
        # to the AR online.
302
        bika_setup = api.get_bika_setup()
303
        if bika_setup.getNotifyOnARRetract():
304
            self.notify_ar_retract(self.context, retest)
305
306
        message = _('${items} invalidated.',
307
                    mapping={'items': self.context.getId()})
308
        self.context.plone_utils.addPortalMessage(message, 'warning')
309
        self.request.response.redirect(retest.absolute_url())
310
311
    def workflow_action_copy_to_new(self):
312
        # Pass the selected AR UIDs in the request, to ar_add.
313
        objects = WorkflowAction._get_selected_items(self)
314
        if not objects:
315
            message = _("No analyses have been selected")
316
            self.context.plone_utils.addPortalMessage(message, 'info')
317
            referer = self.request.get_header("referer")
318
            self.request.response.redirect(referer)
319
            return
320
        url = self.context.absolute_url() + "/ar_add" + \
321
            "?ar_count={0}".format(len(objects)) + \
322
            "&copy_from={0}".format(",".join(objects.keys()))
323
        self.request.response.redirect(url)
324
        return
325
326
    def workflow_action_schedule_sampling(self):
327
        """
328
        This function prevent the transition if the fields "SamplingDate"
329
        and "ScheduledSamplingSampler" are uncompleted.
330
        :returns: bool
331
        """
332
        from bika.lims.utils.workflow import schedulesampling
333
        message = 'Not expected transition.'
334
        # In Samples Folder we have to get each selected item
335
        if interfaces.ISamplesFolder.providedBy(self.context):
336
            select_objs = WorkflowAction._get_selected_items(self)
337
            message = _('Transition done.')
338
            for key in select_objs.keys():
339
                sample = select_objs[key]
340
                # Getting the sampler
341
                sch_sampl = self.request.form.get(
342
                    'getScheduledSamplingSampler', None)[0].get(key) if\
343
                    self.request.form.get(
344
                        'getScheduledSamplingSampler', None) else ''
345
                # Getting the date
346
                sampl_date = self.request.form.get(
347
                    'getSamplingDate', None)[0].get(key) if\
348
                    self.request.form.get(
349
                        'getSamplingDate', None) else ''
350
                # Setting both values
351
                sample.setScheduledSamplingSampler(sch_sampl)
352
                sample.setSamplingDate(sampl_date)
353
                # Transitioning the sample
354
                success, errmsg = schedulesampling.doTransition(sample)
355
                if errmsg == 'missing':
356
                    message = _(
357
                        "'Sampling date' and 'Define the Sampler for the" +
358
                        " scheduled sampling' must be completed and saved " +
359
                        "in order to schedule a sampling. Element: %s" %
360
                        sample.getId())
361
                elif errmsg == 'cant_trans':
362
                    message = _(
363
                        "The item %s can't be transitioned." % sample.getId())
364
                else:
365
                    message = _('Transition done.')
366
                self.context.plone_utils.addPortalMessage(message, 'info')
367
        else:
368
            success, errmsg = schedulesampling.doTransition(self.context)
369
            if errmsg == 'missing':
370
                message = _(
371
                    "'Sampling date' and 'Define the Sampler for the" +
372
                    " scheduled sampling' must be completed and saved in " +
373
                    "order to schedule a sampling.")
374
            elif errmsg == 'cant_trans':
375
                message = _("The item can't be transitioned.")
376
            else:
377
                message = _('Transition done.')
378
            self.context.plone_utils.addPortalMessage(message, 'info')
379
        # Reload the page in order to show the portal message
380
        self.request.response.redirect(self.context.absolute_url())
381
        return success
0 ignored issues
show
introduced by
The variable success does not seem to be defined in case the for loop on line 338 is not entered. Are you sure this can never be the case?
Loading history...
382
383
    def workflow_action_create_partitions(self):
384
        """Redirects the user to the partition magic view
385
        """
386
        uids = list()
387
        if IAnalysisRequest.providedBy(self.context):
388
            uids = [api.get_uid(self.context)]
389
        else:
390
            uids = self.get_selected_uids()
391
        if not uids:
392
            message = "No items selected".format(repr(type(self.context)))
393
            self.redirect(message=message, level="error")
394
395
        # Redirect to the partitioning magic view
396
        url = "{}/partition_magic?uids={}".format(self.back_url, ",".join(uids))
397
        self.redirect(redirect_url=url)
398