Passed
Push — 2.x ( fa98d6...054a36 )
by Jordi
04:42
created

DispatchSamplesView.get_partitions()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
# -*- coding: utf-8 -*-
2
3
import collections
4
import six
5
6
from bika.lims import api
7
from bika.lims import senaiteMessageFactory as _
8
from bika.lims.browser import BrowserView
9
from bika.lims.interfaces import IAnalysisRequest
10
from bika.lims.workflow import isTransitionAllowed
11
from Products.CMFCore.WorkflowCore import WorkflowException
12
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
13
from senaite.core import logger
14
15
16
class DispatchSamplesView(BrowserView):
17
    """Action URL for the sample "dispatch" transition
18
    """
19
    template = ViewPageTemplateFile("templates/dispatch_samples.pt")
20
21
    def __init__(self, context, request):
22
        super(DispatchSamplesView, self).__init__(context, request)
23
        self.context = context
24
        self.request = request
25
        self.portal = api.get_portal()
26
        self.back_url = api.get_url(self.context)
27
28
    def __call__(self):
29
        form = self.request.form
30
31
        # Form submit toggle
32
        form_submitted = form.get("submitted", False)
33
        form_dispatch = form.get("button_dispatch", False)
34
        form_cancel = form.get("button_cancel", False)
35
36
        # Handle book out
37
        if form_submitted and form_dispatch:
38
            logger.info("*** DISPATCH ***")
39
            comment = form.get("comment", "")
40
            if not comment:
41
                return self.redirect(
42
                    redirect_url=self.request.getHeader("http_referer"),
43
                    message=_("Please specify a reason"), level="error")
44
            samples = self.get_samples()
45
46
            for sample in samples:
47
                self.dispatch(sample, comment)
48
            return self.redirect()
49
50
        # Handle cancel
51
        if form_submitted and form_cancel:
52
            return self.redirect(message=_("Cancelled"))
53
        return self.template()
54
55
    def dispatch(self, sample, comment):
56
        """Dispatch the sample
57
        """
58
        wf = api.get_tool("portal_workflow")
59
        try:
60
            wf.doActionFor(sample, "dispatch", comment=comment)
61
            return True
62
        except WorkflowException:
63
            return False
64
65
    def uniquify_items(self, items):
66
        """Uniquify the items with sort order
67
        """
68
        unique = []
69
        for item in items:
70
            if item in unique:
71
                continue
72
            unique.append(item)
73
        return unique
74
75
    def get_partitions(self, sample):
76
        """Return dispatchable sample partitions
77
        """
78
        if not IAnalysisRequest.providedBy(sample):
79
            return []
80
        partitions = sample.getDescendants(all_descendants=False)
81
        return filter(
82
            lambda part: isTransitionAllowed(part, "dispatch"), partitions)
83
84
    def get_samples(self):
85
        """Extract the samples from the request UIDs
86
87
        This might be either a samples container or a sample context
88
        """
89
90
        # fetch objects from request
91
        objs = self.get_objects_from_request()
92
93
        samples = []
94
95
        for obj in objs:
96
            # when coming from the samples listing
97
            if IAnalysisRequest.providedBy(obj):
98
                samples.append(obj)
99
                samples.extend(self.get_partitions(obj))
100
101
        # when coming from the WF menu inside a sample
102
        if IAnalysisRequest.providedBy(self.context):
103
            samples.append(self.context)
104
            samples.extend(self.get_partitions(self.context))
105
106
        return self.uniquify_items(samples)
107
108
    def get_title(self, obj):
109
        """Return the object title as unicode
110
        """
111
        title = api.get_title(obj)
112
        return api.safe_unicode(title)
113
114
    def get_samples_data(self):
115
        """Returns a list of containers that can be moved
116
        """
117
        for obj in self.get_samples():
118
            obj = api.get_object(obj)
119
            yield {
120
                "obj": obj,
121
                "id": api.get_id(obj),
122
                "uid": api.get_uid(obj),
123
                "title": self.get_title(obj),
124
                "url": api.get_url(obj),
125
                "sample_type": obj.getSampleTypeTitle(),
126
            }
127
128
    def get_objects_from_request(self):
129
        """Returns a list of objects coming from the "uids" request parameter
130
        """
131
        unique_uids = self.get_uids_from_request()
132
        return filter(None, map(self.get_object_by_uid, unique_uids))
133
134
    def get_uids_from_request(self):
135
        """Return a list of uids from the request
136
        """
137
        uids = self.request.form.get("uids", "")
138
        if isinstance(uids, six.string_types):
139
            uids = uids.split(",")
140
        unique_uids = collections.OrderedDict().fromkeys(uids).keys()
141
        return filter(api.is_uid, unique_uids)
142
143
    def get_object_by_uid(self, uid):
144
        """Get the object by UID
145
        """
146
        logger.debug("get_object_by_uid::UID={}".format(uid))
147
        obj = api.get_object_by_uid(uid, None)
148
        if obj is None:
149
            logger.warn("!! No object found for UID #{} !!")
150
        return obj
151
152
    def redirect(self, redirect_url=None, message=None, level="info"):
153
        """Redirect with a message
154
        """
155
        if redirect_url is None:
156
            redirect_url = self.back_url
157
        if message is not None:
158
            self.add_status_message(message, level)
159
        return self.request.response.redirect(redirect_url)
160
161
    def add_status_message(self, message, level="info"):
162
        """Set a portal status message
163
        """
164
        return self.context.plone_utils.addPortalMessage(message, level)
165