Passed
Push — 2.x ( fa98d6...054a36 )
by Jordi
04:42
created

senaite.core.browser.samples.dispatch_samples   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 165
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 32
eloc 105
dl 0
loc 165
rs 9.84
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A DispatchSamplesView.__init__() 0 6 1
A DispatchSamplesView.get_title() 0 5 1
B DispatchSamplesView.__call__() 0 26 7
A DispatchSamplesView.get_samples_data() 0 12 2
A DispatchSamplesView.get_object_by_uid() 0 8 2
A DispatchSamplesView.get_partitions() 0 8 3
A DispatchSamplesView.get_uids_from_request() 0 8 2
A DispatchSamplesView.get_objects_from_request() 0 5 1
A DispatchSamplesView.uniquify_items() 0 9 3
A DispatchSamplesView.get_samples() 0 23 4
A DispatchSamplesView.redirect() 0 8 3
A DispatchSamplesView.add_status_message() 0 4 1
A DispatchSamplesView.dispatch() 0 9 2
1
# -*- coding: utf-8 -*-
2
3
import collections
4
import six
5
6
from bika.lims import api
7
from bika.lims import senaiteMessageFactory as _
8
from bika.lims.browser import BrowserView
9
from bika.lims.interfaces import IAnalysisRequest
10
from bika.lims.workflow import isTransitionAllowed
11
from Products.CMFCore.WorkflowCore import WorkflowException
12
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
13
from senaite.core import logger
14
15
16
class DispatchSamplesView(BrowserView):
17
    """Action URL for the sample "dispatch" transition
18
    """
19
    template = ViewPageTemplateFile("templates/dispatch_samples.pt")
20
21
    def __init__(self, context, request):
22
        super(DispatchSamplesView, self).__init__(context, request)
23
        self.context = context
24
        self.request = request
25
        self.portal = api.get_portal()
26
        self.back_url = api.get_url(self.context)
27
28
    def __call__(self):
29
        form = self.request.form
30
31
        # Form submit toggle
32
        form_submitted = form.get("submitted", False)
33
        form_dispatch = form.get("button_dispatch", False)
34
        form_cancel = form.get("button_cancel", False)
35
36
        # Handle book out
37
        if form_submitted and form_dispatch:
38
            logger.info("*** DISPATCH ***")
39
            comment = form.get("comment", "")
40
            if not comment:
41
                return self.redirect(
42
                    redirect_url=self.request.getHeader("http_referer"),
43
                    message=_("Please specify a reason"), level="error")
44
            samples = self.get_samples()
45
46
            for sample in samples:
47
                self.dispatch(sample, comment)
48
            return self.redirect()
49
50
        # Handle cancel
51
        if form_submitted and form_cancel:
52
            return self.redirect(message=_("Cancelled"))
53
        return self.template()
54
55
    def dispatch(self, sample, comment):
56
        """Dispatch the sample
57
        """
58
        wf = api.get_tool("portal_workflow")
59
        try:
60
            wf.doActionFor(sample, "dispatch", comment=comment)
61
            return True
62
        except WorkflowException:
63
            return False
64
65
    def uniquify_items(self, items):
66
        """Uniquify the items with sort order
67
        """
68
        unique = []
69
        for item in items:
70
            if item in unique:
71
                continue
72
            unique.append(item)
73
        return unique
74
75
    def get_partitions(self, sample):
76
        """Return dispatchable sample partitions
77
        """
78
        if not IAnalysisRequest.providedBy(sample):
79
            return []
80
        partitions = sample.getDescendants(all_descendants=False)
81
        return filter(
82
            lambda part: isTransitionAllowed(part, "dispatch"), partitions)
83
84
    def get_samples(self):
85
        """Extract the samples from the request UIDs
86
87
        This might be either a samples container or a sample context
88
        """
89
90
        # fetch objects from request
91
        objs = self.get_objects_from_request()
92
93
        samples = []
94
95
        for obj in objs:
96
            # when coming from the samples listing
97
            if IAnalysisRequest.providedBy(obj):
98
                samples.append(obj)
99
                samples.extend(self.get_partitions(obj))
100
101
        # when coming from the WF menu inside a sample
102
        if IAnalysisRequest.providedBy(self.context):
103
            samples.append(self.context)
104
            samples.extend(self.get_partitions(self.context))
105
106
        return self.uniquify_items(samples)
107
108
    def get_title(self, obj):
109
        """Return the object title as unicode
110
        """
111
        title = api.get_title(obj)
112
        return api.safe_unicode(title)
113
114
    def get_samples_data(self):
115
        """Returns a list of containers that can be moved
116
        """
117
        for obj in self.get_samples():
118
            obj = api.get_object(obj)
119
            yield {
120
                "obj": obj,
121
                "id": api.get_id(obj),
122
                "uid": api.get_uid(obj),
123
                "title": self.get_title(obj),
124
                "url": api.get_url(obj),
125
                "sample_type": obj.getSampleTypeTitle(),
126
            }
127
128
    def get_objects_from_request(self):
129
        """Returns a list of objects coming from the "uids" request parameter
130
        """
131
        unique_uids = self.get_uids_from_request()
132
        return filter(None, map(self.get_object_by_uid, unique_uids))
133
134
    def get_uids_from_request(self):
135
        """Return a list of uids from the request
136
        """
137
        uids = self.request.form.get("uids", "")
138
        if isinstance(uids, six.string_types):
139
            uids = uids.split(",")
140
        unique_uids = collections.OrderedDict().fromkeys(uids).keys()
141
        return filter(api.is_uid, unique_uids)
142
143
    def get_object_by_uid(self, uid):
144
        """Get the object by UID
145
        """
146
        logger.debug("get_object_by_uid::UID={}".format(uid))
147
        obj = api.get_object_by_uid(uid, None)
148
        if obj is None:
149
            logger.warn("!! No object found for UID #{} !!")
150
        return obj
151
152
    def redirect(self, redirect_url=None, message=None, level="info"):
153
        """Redirect with a message
154
        """
155
        if redirect_url is None:
156
            redirect_url = self.back_url
157
        if message is not None:
158
            self.add_status_message(message, level)
159
        return self.request.response.redirect(redirect_url)
160
161
    def add_status_message(self, message, level="info"):
162
        """Set a portal status message
163
        """
164
        return self.context.plone_utils.addPortalMessage(message, level)
165