Passed
Push — 2.x ( c3bd1e...f7830e )
by Jordi
06:55
created

bika.lims.workflow.ActionHandlerPool.succeed()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2024 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import sys
22
23
import six
24
from AccessControl.SecurityInfo import ModuleSecurityInfo
25
from bika.lims import PMF
26
from bika.lims import api
27
from bika.lims import logger
28
from bika.lims.browser import ulocalized_time
29
from bika.lims.interfaces import IGuardAdapter
30
from bika.lims.interfaces import IJSONReadExtender
31
from bika.lims.jsonapi import get_include_fields
32
from Products.CMFCore.utils import getToolByName
33
from Products.CMFCore.WorkflowCore import WorkflowException
34
from senaite.core.i18n import translate as t
35
from zope.component import getAdapters
36
from zope.interface import implements
37
38
security = ModuleSecurityInfo('bika.lims.workflow')
39
security.declarePublic('guard_handler')
40
41
_marker = object()
42
43
44
def skip(instance, action, peek=False, unskip=False):
45
    """Returns True if the transition is to be SKIPPED
46
47
        peek - True just checks the value, does not set.
48
        unskip - remove skip key (for manual overrides).
49
50
    called with only (instance, action_id), this will set the request variable
51
    preventing the cascade's from re-transitioning the object and return None.
52
    """
53
54
    uid = callable(instance.UID) and instance.UID() or instance.UID
55
    skipkey = "%s_%s" % (uid, action)
56
    if 'workflow_skiplist' not in instance.REQUEST:
57
        if not peek and not unskip:
58
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
59
    else:
60
        if skipkey in instance.REQUEST['workflow_skiplist']:
61
            if unskip:
62
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
63
            else:
64
                return True
65
        else:
66
            if not peek and not unskip:
67
                instance.REQUEST["workflow_skiplist"].append(skipkey)
68
69
70
def doActionFor(instance, action_id):
71
    """Tries to perform the transition to the instance.
72
    Object is reindexed after the transition takes place, but only if succeeds.
73
    :param instance: Object to be transitioned
74
    :param action_id: transition id
75
    :returns: True if the transition has been performed, together with message
76
    :rtype: tuple (bool,str)
77
    """
78
    if not instance:
79
        return False, ""
80
81
    if isinstance(instance, list):
82
        # TODO Workflow . Check if this is strictly necessary
83
        # This check is here because sometimes Plone creates a list
84
        # from submitted form elements.
85
        logger.warn("Got a list of obj in doActionFor!")
86
        if len(instance) > 1:
87
            logger.warn(
88
                "doActionFor is getting an instance parameter which is a list "
89
                "with more than one item. Instance: '{}', action_id: '{}'"
90
                .format(instance, action_id)
91
            )
92
93
        return doActionFor(instance=instance[0], action_id=action_id)
94
95
    succeed = False
96
    message = ""
97
    workflow = api.get_tool("portal_workflow")
98
    try:
99
        workflow.doActionFor(instance, action_id)
100
        succeed = True
101
    except WorkflowException as e:
102
        message = str(e)
103
        curr_state = getCurrentState(instance)
104
        clazz_name = instance.__class__.__name__
105
        logger.warning(
106
            "Transition '{0}' not allowed: {1} '{2}' ({3})"
107
            .format(action_id, clazz_name, instance.getId(), curr_state))
108
        logger.error(message)
109
110
    return succeed, message
111
112
113
def call_workflow_event(instance, event, after=True):
114
    """Calls the instance's workflow event
115
    """
116
    if not event.transition:
117
        return False
118
119
    portal_type = instance.portal_type
120
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
121
    if not wf_module:
122
        return False
123
124
    # Inspect if event_<transition_id> function exists in the module
125
    prefix = after and "after" or "before"
126
    func_name = "{}_{}".format(prefix, event.transition.id)
127
    func = getattr(wf_module, func_name, False)
128
    if not func:
129
        return False
130
131
    logger.info('WF event: {0}.events.{1}'
132
                .format(portal_type.lower(), func_name))
133
    func(instance)
134
    return True
135
136
137
def BeforeTransitionEventHandler(instance, event):
138
    """ This event is executed before each transition and delegates further
139
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
140
    if exists for the instance passed in.
141
    :param instance: the instance to be transitioned
142
    :type instance: ATContentType
143
    :param event: event that holds the transition to be performed
144
    :type event: IObjectEvent
145
    """
146
    call_workflow_event(instance, event, after=False)
147
148
149
def AfterTransitionEventHandler(instance, event):
150
    """ This event is executed after each transition and delegates further
151
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
152
    if exists for the instance passed in.
153
    :param instance: the instance that has been transitioned
154
    :type instance: ATContentType
155
    :param event: event that holds the transition performed
156
    :type event: IObjectEvent
157
    """
158
    if call_workflow_event(instance, event, after=True):
159
        return
160
161
    # Try with old AfterTransitionHandler dance...
162
    # TODO REMOVE AFTER PORTING workflow_script_*/*_transition_event
163
    if not event.transition:
164
        return
165
    # Set the request variable preventing cascade's from re-transitioning.
166
    if skip(instance, event.transition.id):
167
        return
168
    # Because at this point, the object has been transitioned already, but
169
    # further actions are probably needed still, so be sure is reindexed
170
    # before going forward.
171
    instance.reindexObject()
172
    key = 'after_{0}_transition_event'.format(event.transition.id)
173
    after_event = getattr(instance, key, False)
174
    if not after_event:
175
        # TODO Workflow. this conditional is only for backwards compatibility,
176
        # to be removed when all workflow_script_* methods in contents are
177
        # replaced by the more explicity signature 'after_*_transition_event'
178
        key = 'workflow_script_' + event.transition.id
179
        after_event = getattr(instance, key, False)
180
    if not after_event:
181
        return
182
    after_event()
183
184
185
def get_workflow_actions(obj):
186
    """ Compile a list of possible workflow transitions for this object
187
    """
188
189
    def translate(id):
190
        return t(PMF(id + "_transition_title"))
191
192
    transids = getAllowedTransitions(obj)
193
    actions = [{'id': it, 'title': translate(it)} for it in transids]
194
    return actions
195
196
197
def isTransitionAllowed(instance, transition_id):
198
    """Checks if the object can perform the transition passed in.
199
    :returns: True if transition can be performed
200
    :rtype: bool
201
    """
202
    wf_tool = getToolByName(instance, "portal_workflow")
203
    for wf_id in wf_tool.getChainFor(instance):
204
        wf = wf_tool.getWorkflowById(wf_id)
205
        if wf and wf.isActionSupported(instance, transition_id):
206
            return True
207
208
    return False
209
210
211
def getAllowedTransitions(instance):
212
    """Returns a list with the transition ids that can be performed against
213
    the instance passed in.
214
    :param instance: A content object
215
    :type instance: ATContentType
216
    :returns: A list of transition/action ids
217
    :rtype: list
218
    """
219
    wftool = getToolByName(instance, "portal_workflow")
220
    transitions = wftool.getTransitionsFor(instance)
221
    return [trans['id'] for trans in transitions]
222
223
224
def getCurrentState(obj, stateflowid='review_state'):
225
    """ The current state of the object for the state flow id specified
226
        Return empty if there's no workflow state for the object and flow id
227
    """
228
    return api.get_workflow_status_of(obj, stateflowid)
229
230
231
def in_state(obj, states, stateflowid='review_state'):
232
    """ Returns if the object passed matches with the states passed in
233
    """
234
    if not states:
235
        return False
236
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
237
    return obj_state in states
238
239
240
def getTransitionActor(obj, action_id):
241
    """Returns the actor that performed a given transition. If transition has
242
    not been perormed, or current user has no privileges, returns None
243
    :return: the username of the user that performed the transition passed-in
244
    :type: string
245
    """
246
    review_history = api.get_review_history(obj)
247
    for event in review_history:
248
        if event.get('action') == action_id:
249
            return event.get('actor')
250
    return None
251
252
253
def getTransitionDate(obj, action_id, return_as_datetime=False):
254
    """
255
    Returns date of action for object. Sometimes we need this date in Datetime
256
    format and that's why added return_as_datetime param.
257
    """
258
    review_history = api.get_review_history(obj)
259
    for event in review_history:
260
        if event.get('action') == action_id:
261
            evtime = event.get('time')
262
            if return_as_datetime:
263
                return evtime
264
            if evtime:
265
                value = ulocalized_time(evtime, long_format=True,
266
                                        time_only=False, context=obj)
267
                return value
268
    return None
269
270
271
def getTransitionUsers(obj, action_id, last_user=False):
272
    """
273
    This function returns a list with the users who have done the transition.
274
    :action_id: a sring as the transition id.
275
    :last_user: a boolean to return only the last user triggering the
276
        transition or all of them.
277
    :returns: a list of user ids.
278
    """
279
    workflow = getToolByName(obj, 'portal_workflow')
280
    users = []
281
    try:
282
        # https://jira.bikalabs.com/browse/LIMS-2242:
283
        # Sometimes the workflow history is inexplicably missing!
284
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
285
    except WorkflowException:
286
        logger.error(
287
            "workflow history is inexplicably missing."
288
            " https://jira.bikalabs.com/browse/LIMS-2242")
289
        return users
290
    # invert the list, so we always see the most recent matching event
291
    review_history.reverse()
292
    for event in review_history:
293
        if event.get('action', '') == action_id:
294
            value = event.get('actor', '')
295
            users.append(value)
296
            if last_user:
297
                return users
298
    return users
299
300
301
def guard_handler(instance, transition_id):
302
    """Generic workflow guard handler that returns true if the transition_id
303
    passed in can be performed to the instance passed in.
304
305
    This function is called automatically by a Script (Python) located at
306
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
307
    expression like "python:here.guard_handler('<transition_id>')" is set to
308
    any given guard (used by default in all bika's DC Workflow guards).
309
310
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
311
    that matches with 'guard_<transition_id>'. If found, calls the function and
312
    returns its value (true or false). If not found, returns True by default.
313
314
    :param instance: the object for which the transition_id has to be evaluated
315
    :param transition_id: the id of the transition
316
    :type instance: ATContentType
317
    :type transition_id: string
318
    :return: true if the transition can be performed to the passed in instance
319
    :rtype: bool
320
    """
321
    if not instance:
322
        return True
323
324
    # If adapters are found, core's guard will only be evaluated if, and only
325
    # if, ALL "pre-guards" return True
326
    for name, ad in getAdapters((instance,), IGuardAdapter):
327
        if ad.guard(transition_id) is False:
328
            return False
329
330
    clazz_name = instance.portal_type
331
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
332
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
333
    if not wf_module:
334
        return True
335
336
    # Inspect if guard_<transition_id> function exists in the above module
337
    key = 'guard_{0}'.format(transition_id)
338
    guard = getattr(wf_module, key, False)
339
    if not guard:
340
        return True
341
342
    return guard(instance)
343
344
345
def _load_wf_module(module_relative_name):
346
    """Loads a python module based on the module relative name passed in.
347
348
    At first, tries to get the module from sys.modules. If not found there, the
349
    function tries to load it by using importlib. Returns None if no module
350
    found or importlib is unable to load it because of errors.
351
    Eg:
352
        _load_wf_module('sample.events')
353
354
    will try to load the module 'bika.lims.workflow.sample.events'
355
356
    :param modrelname: relative name of the module to be loaded
357
    :type modrelname: string
358
    :return: the module
359
    :rtype: module
360
    """
361
    if not module_relative_name:
362
        return None
363
    if not isinstance(module_relative_name, six.string_types):
364
        return None
365
366
    rootmodname = __name__
367
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
368
    if modulekey in sys.modules:
369
        return sys.modules.get(modulekey, None)
370
371
    # Try to load the module recursively
372
    modname = None
373
    tokens = module_relative_name.split('.')
374
    for part in tokens:
375
        modname = '.'.join([modname, part]) if modname else part
376
        import importlib
377
        try:
378
            _module = importlib.import_module('.'+modname, package=rootmodname)
379
            if not _module:
380
                return None
381
        except Exception:
382
            return None
383
    return sys.modules.get(modulekey, None)
384
385
386
class JSONReadExtender(object):
387
388
    """- Adds the list of possible transitions to each object, if 'transitions'
389
    is specified in the include_fields.
390
    """
391
392
    implements(IJSONReadExtender)
393
394
    def __init__(self, context):
395
        self.context = context
396
397
    def __call__(self, request, data):
398
        include_fields = get_include_fields(request)
399
        if not include_fields or "transitions" in include_fields:
400
            data['transitions'] = get_workflow_actions(self.context)
401