Passed
Pull Request — 2.x (#1772)
by Jordi
05:05
created

senaite.core.browser.samples.dispatch_samples   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 164
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 32
eloc 104
dl 0
loc 164
rs 9.84
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A DispatchSamplesView.__init__() 0 6 1
A DispatchSamplesView.get_title() 0 5 1
B DispatchSamplesView.__call__() 0 26 7
A DispatchSamplesView.get_samples_data() 0 12 2
A DispatchSamplesView.get_object_by_uid() 0 8 2
A DispatchSamplesView.get_partitions() 0 8 3
A DispatchSamplesView.get_uids_from_request() 0 8 2
A DispatchSamplesView.get_objects_from_request() 0 5 1
A DispatchSamplesView.uniquify_items() 0 9 3
A DispatchSamplesView.get_samples() 0 23 4
A DispatchSamplesView.redirect() 0 8 3
A DispatchSamplesView.add_status_message() 0 4 1
A DispatchSamplesView.dispatch() 0 9 2
1
# -*- coding: utf-8 -*-
2
3
import collections
4
5
from bika.lims import api
6
from bika.lims import senaiteMessageFactory as _
7
from bika.lims.browser import BrowserView
8
from bika.lims.interfaces import IAnalysisRequest
9
from bika.lims.workflow import isTransitionAllowed
10
from Products.CMFCore.WorkflowCore import WorkflowException
11
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
12
from senaite.core import logger
13
14
15
class DispatchSamplesView(BrowserView):
16
    """Action URL for the sample "dispatch" transition
17
    """
18
    template = ViewPageTemplateFile("templates/dispatch_samples.pt")
19
20
    def __init__(self, context, request):
21
        super(DispatchSamplesView, self).__init__(context, request)
22
        self.context = context
23
        self.request = request
24
        self.portal = api.get_portal()
25
        self.back_url = api.get_url(self.context)
26
27
    def __call__(self):
28
        form = self.request.form
29
30
        # Form submit toggle
31
        form_submitted = form.get("submitted", False)
32
        form_dispatch = form.get("button_dispatch", False)
33
        form_cancel = form.get("button_cancel", False)
34
35
        # Handle book out
36
        if form_submitted and form_dispatch:
37
            logger.info("*** DISPATCH ***")
38
            comment = form.get("comment", "")
39
            if not comment:
40
                return self.redirect(
41
                    redirect_url=self.request.getHeader("http_referer"),
42
                    message=_("Please specify a reason"), level="error")
43
            samples = self.get_samples()
44
45
            for sample in samples:
46
                self.dispatch(sample, comment)
47
            return self.redirect()
48
49
        # Handle cancel
50
        if form_submitted and form_cancel:
51
            return self.redirect(message=_("Cancelled"))
52
        return self.template()
53
54
    def dispatch(self, sample, comment):
55
        """Dispatch the sample
56
        """
57
        wf = api.get_tool("portal_workflow")
58
        try:
59
            wf.doActionFor(sample, "dispatch", comment=comment)
60
            return True
61
        except WorkflowException:
62
            return False
63
64
    def uniquify_items(self, items):
65
        """Uniquify the items with sort order
66
        """
67
        unique = []
68
        for item in items:
69
            if item in unique:
70
                continue
71
            unique.append(item)
72
        return unique
73
74
    def get_partitions(self, sample):
75
        """Return dispatchable sample partitions
76
        """
77
        if not IAnalysisRequest.providedBy(sample):
78
            return []
79
        partitions = sample.getDescendants(all_descendants=False)
80
        return filter(
81
            lambda part: isTransitionAllowed(part, "dispatch"), partitions)
82
83
    def get_samples(self):
84
        """Extract the samples from the request UIDs
85
86
        This might be either a samples container or a sample context
87
        """
88
89
        # fetch objects from request
90
        objs = self.get_objects_from_request()
91
92
        samples = []
93
94
        for obj in objs:
95
            # when coming from the samples listing
96
            if IAnalysisRequest.providedBy(obj):
97
                samples.append(obj)
98
                samples.extend(self.get_partitions(obj))
99
100
        # when coming from the WF menu inside a sample
101
        if IAnalysisRequest.providedBy(self.context):
102
            samples.append(self.context)
103
            samples.extend(self.get_partitions(self.context))
104
105
        return self.uniquify_items(samples)
106
107
    def get_title(self, obj):
108
        """Return the object title as unicode
109
        """
110
        title = api.get_title(obj)
111
        return api.safe_unicode(title)
112
113
    def get_samples_data(self):
114
        """Returns a list of containers that can be moved
115
        """
116
        for obj in self.get_samples():
117
            obj = api.get_object(obj)
118
            yield {
119
                "obj": obj,
120
                "id": api.get_id(obj),
121
                "uid": api.get_uid(obj),
122
                "title": self.get_title(obj),
123
                "url": api.get_url(obj),
124
                "sample_type": obj.getSampleTypeTitle(),
125
            }
126
127
    def get_objects_from_request(self):
128
        """Returns a list of objects coming from the "uids" request parameter
129
        """
130
        unique_uids = self.get_uids_from_request()
131
        return filter(None, map(self.get_object_by_uid, unique_uids))
132
133
    def get_uids_from_request(self):
134
        """Return a list of uids from the request
135
        """
136
        uids = self.request.form.get("uids", "")
137
        if isinstance(uids, basestring):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable basestring does not seem to be defined.
Loading history...
138
            uids = uids.split(",")
139
        unique_uids = collections.OrderedDict().fromkeys(uids).keys()
140
        return filter(api.is_uid, unique_uids)
141
142
    def get_object_by_uid(self, uid):
143
        """Get the object by UID
144
        """
145
        logger.debug("get_object_by_uid::UID={}".format(uid))
146
        obj = api.get_object_by_uid(uid, None)
147
        if obj is None:
148
            logger.warn("!! No object found for UID #{} !!")
149
        return obj
150
151
    def redirect(self, redirect_url=None, message=None, level="info"):
152
        """Redirect with a message
153
        """
154
        if redirect_url is None:
155
            redirect_url = self.back_url
156
        if message is not None:
157
            self.add_status_message(message, level)
158
        return self.request.response.redirect(redirect_url)
159
160
    def add_status_message(self, message, level="info"):
161
        """Set a portal status message
162
        """
163
        return self.context.plone_utils.addPortalMessage(message, level)
164