Passed
Push — master ( 7bdfa8...29d318 )
by Ramon
05:08
created

bika.lims.workflow.wasTransitionPerformed()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import collections
9
import sys
10
11
from AccessControl.SecurityInfo import ModuleSecurityInfo
12
from Products.CMFCore.WorkflowCore import WorkflowException
13
from Products.CMFCore.utils import getToolByName
14
from bika.lims import PMF
15
from bika.lims import api
16
from bika.lims import logger
17
from bika.lims.browser import ulocalized_time
18
from bika.lims.interfaces import IJSONReadExtender
19
from bika.lims.jsonapi import get_include_fields
20
from bika.lims.utils import changeWorkflowState
21
from bika.lims.utils import t
22
from bika.lims.workflow.indexes import ACTIONS_TO_INDEXES
23
from zope.interface import implements
24
25
security = ModuleSecurityInfo('bika.lims.workflow')
26
security.declarePublic('guard_handler')
27
28
_marker = object()
29
30
def skip(instance, action, peek=False, unskip=False):
31
    """Returns True if the transition is to be SKIPPED
32
33
        peek - True just checks the value, does not set.
34
        unskip - remove skip key (for manual overrides).
35
36
    called with only (instance, action_id), this will set the request variable preventing the
37
    cascade's from re-transitioning the object and return None.
38
    """
39
40
    uid = callable(instance.UID) and instance.UID() or instance.UID
41
    skipkey = "%s_%s" % (uid, action)
42
    if 'workflow_skiplist' not in instance.REQUEST:
43
        if not peek and not unskip:
44
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
45
    else:
46
        if skipkey in instance.REQUEST['workflow_skiplist']:
47
            if unskip:
48
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
49
            else:
50
                return True
51
        else:
52
            if not peek and not unskip:
53
                instance.REQUEST["workflow_skiplist"].append(skipkey)
54
55
56
def doActionFor(instance, action_id, idxs=None):
57
    """Tries to perform the transition to the instance.
58
    Object is reindexed after the transition takes place, but only if succeeds.
59
    If idxs is set, only these indexes will be reindexed. Otherwise, will try
60
    to use the indexes defined in ACTIONS_TO_INDEX mapping if any.
61
    :param instance: Object to be transitioned
62
    :param action_id: transition id
63
    :param idxs: indexes to be reindexed after the transition
64
    :returns: True if the transition has been performed, together with message
65
    :rtype: tuple (bool,str)
66
    """
67
    if not instance:
68
        return False, ""
69
70
    if isinstance(instance, list):
71
        # TODO Workflow . Check if this is strictly necessary
72
        # This check is here because sometimes Plone creates a list
73
        # from submitted form elements.
74
        logger.warn("Got a list of obj in doActionFor!")
75
        if len(instance) > 1:
76
            logger.warn(
77
                "doActionFor is getting an instance parameter which is a list "
78
                "with more than one item. Instance: '{}', action_id: '{}'"
79
                .format(instance, action_id)
80
            )
81
82
        return doActionFor(instance=instance[0], action_id=action_id, idxs=idxs)
83
84
    # Since a given transition can cascade or promote to other objects, we want
85
    # to reindex all objects for which the transition succeed at once, at the
86
    # end of process. Otherwise, same object will be reindexed multiple times
87
    # unnecessarily. Also, ActionsHandlerPool ensures the same transition is not
88
    # applied twice to the same object due to cascade/promote recursions.
89
    pool = ActionHandlerPool.get_instance()
90
    if pool.succeed(instance, action_id):
91
        return False, "Transition {} for {} already done"\
92
             .format(action_id, instance.getId())
93
94
    # Return False if transition is not permitted
95
    if not isTransitionAllowed(instance, action_id):
96
        return False, "Transition {} for {} is not allowed"\
97
            .format(action_id, instance.getId())
98
99
    # Add this batch process to the queue
100
    pool.queue_pool()
101
    succeed = False
102
    message = ""
103
    workflow = getToolByName(instance, "portal_workflow")
104
    try:
105
        workflow.doActionFor(instance, action_id)
106
        succeed = True
107
    except WorkflowException as e:
108
        message = str(e)
109
        curr_state = getCurrentState(instance)
110
        clazz_name = instance.__class__.__name__
111
        logger.warning(
112
            "Transition '{0}' not allowed: {1} '{2}' ({3})"\
113
            .format(action_id, clazz_name, instance.getId(), curr_state))
114
        logger.error(message)
115
116
    # If no indexes to reindex have been defined, try to use those defined in
117
    # the ACTIONS_TO_INDEXES mapping. Reindexing only those indexes that might
118
    # be affected by the transition boosts the overall performance!.
119
    if idxs is None:
120
        portal_type = instance.portal_type
121
        idxs = ACTIONS_TO_INDEXES.get(portal_type, {}).get(action_id, [])
122
123
    # Add the current object to the pool and resume
124
    pool.push(instance, action_id, succeed, idxs=idxs)
125
    pool.resume()
126
127
    return succeed, message
128
129
130
def call_workflow_event(instance, event, after=True):
131
    """Calls the instance's workflow event
132
    """
133
    if not event.transition:
134
        return False
135
136
    portal_type = instance.portal_type
137
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
138
    if not wf_module:
139
        return False
140
141
    # Inspect if event_<transition_id> function exists in the module
142
    prefix = after and "after" or "before"
143
    func_name = "{}_{}".format(prefix, event.transition.id)
144
    func = getattr(wf_module, func_name, False)
145
    if not func:
146
        return False
147
148
    logger.info('WF event: {0}.events.{1}'
149
                .format(portal_type.lower(), func_name))
150
    func(instance)
151
    return True
152
153
154
def BeforeTransitionEventHandler(instance, event):
155
    """ This event is executed before each transition and delegates further
156
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
157
    if exists for the instance passed in.
158
    :param instance: the instance to be transitioned
159
    :type instance: ATContentType
160
    :param event: event that holds the transition to be performed
161
    :type event: IObjectEvent
162
    """
163
    call_workflow_event(instance, event, after=False)
164
165
166
def AfterTransitionEventHandler(instance, event):
167
    """ This event is executed after each transition and delegates further
168
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
169
    if exists for the instance passed in.
170
    :param instance: the instance that has been transitioned
171
    :type instance: ATContentType
172
    :param event: event that holds the transition performed
173
    :type event: IObjectEvent
174
    """
175
    if call_workflow_event(instance, event, after=True):
176
        return
177
178
    # Try with old AfterTransitionHandler dance...
179
    # TODO CODE TO BE REMOVED AFTER PORTING workflow_script_*/*_transition_event
180
    if not event.transition:
181
        return
182
    # Set the request variable preventing cascade's from re-transitioning.
183
    if skip(instance, event.transition.id):
184
        return
185
    # Because at this point, the object has been transitioned already, but
186
    # further actions are probably needed still, so be sure is reindexed
187
    # before going forward.
188
    instance.reindexObject()
189
    key = 'after_{0}_transition_event'.format(event.transition.id)
190
    after_event = getattr(instance, key, False)
191
    if not after_event:
192
        # TODO Workflow. this conditional is only for backwards compatibility,
193
        # to be removed when all workflow_script_* methods in contents are
194
        # replaced by the more explicity signature 'after_*_transition_event'
195
        key = 'workflow_script_' + event.transition.id
196
        after_event = getattr(instance, key, False)
197
    if not after_event:
198
        return
199
    after_event()
200
201
202
def get_workflow_actions(obj):
203
    """ Compile a list of possible workflow transitions for this object
204
    """
205
206
    def translate(id):
207
        return t(PMF(id + "_transition_title"))
208
209
    transids = getAllowedTransitions(obj)
210
    actions = [{'id': it, 'title': translate(it)} for it in transids]
211
    return actions
212
213
214
def isTransitionAllowed(instance, transition_id):
215
    """Checks if the object can perform the transition passed in.
216
    :returns: True if transition can be performed
217
    :rtype: bool
218
    """
219
    wf_tool = getToolByName(instance, "portal_workflow")
220
    for wf_id in wf_tool.getChainFor(instance):
221
        wf = wf_tool.getWorkflowById(wf_id)
222
        if wf and wf.isActionSupported(instance, transition_id):
223
            return True
224
225
    return False
226
227
228
def getAllowedTransitions(instance):
229
    """Returns a list with the transition ids that can be performed against
230
    the instance passed in.
231
    :param instance: A content object
232
    :type instance: ATContentType
233
    :returns: A list of transition/action ids
234
    :rtype: list
235
    """
236
    wftool = getToolByName(instance, "portal_workflow")
237
    transitions = wftool.getTransitionsFor(instance)
238
    return [trans['id'] for trans in transitions]
239
240
241
def wasTransitionPerformed(instance, transition_id):
242
    """Checks if the transition has already been performed to the object
243
    Instance's workflow history is checked.
244
    """
245
    transitions = getReviewHistoryActionsList(instance)
246
    return transition_id in transitions
247
248
249
def getReviewHistoryActionsList(instance, reverse=False):
250
    """Returns a list with the actions performed to the instance
251
    """
252
    review_history = getReviewHistory(instance, reverse=reverse)
253
    return map(lambda event: event["action"], review_history)
254
255
256
def get_prev_status_from_history(instance, status=None):
257
    """Returns the previous status of the object. If status is set, returns the
258
    previous status before the object reached the status passed in.
259
    If instance has reached the status passed in more than once, only the last
260
    one is considered.
261
    """
262
    target = status or api.get_workflow_status_of(instance)
263
    history = getReviewHistory(instance, reverse=True)
264
    history = map(lambda event: event["review_state"], history)
265
    if target not in history or history.index(target) == len(history)-1:
266
        return None
267
    return history[history.index(target)+1]
268
269
270
def getReviewHistory(instance, reverse=True):
271
    """Returns the review history for the instance
272
    :returns: the list of historic events as dicts
273
    """
274
    review_history = []
275
    workflow = getToolByName(instance, 'portal_workflow')
276
    try:
277
        review_history = list(workflow.getInfoFor(instance, 'review_history'))
278
    except WorkflowException:
279
        logger.error("Unable to retrieve review history from {}:{}"
280
                     .format(instance.portal_type, instance.getId()))
281
    if reverse:
282
        # invert the list, so we always see the most recent matching event
283
        review_history.reverse()
284
    return review_history
285
286
def getCurrentState(obj, stateflowid='review_state'):
287
    """ The current state of the object for the state flow id specified
288
        Return empty if there's no workflow state for the object and flow id
289
    """
290
    return api.get_workflow_status_of(obj, stateflowid)
291
292
def in_state(obj, states, stateflowid='review_state'):
293
    """ Returns if the object passed matches with the states passed in
294
    """
295
    if not states:
296
        return False
297
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
298
    return obj_state in states
299
300
def getTransitionActor(obj, action_id):
301
    """Returns the actor that performed a given transition. If transition has
302
    not been perormed, or current user has no privileges, returns None
303
    :return: the username of the user that performed the transition passed-in
304
    :type: string
305
    """
306
    review_history = getReviewHistory(obj)
307
    for event in review_history:
308
        if event.get('action') == action_id:
309
            return event.get('actor')
310
    return None
311
312
313
def getTransitionDate(obj, action_id, return_as_datetime=False):
314
    """
315
    Returns date of action for object. Sometimes we need this date in Datetime
316
    format and that's why added return_as_datetime param.
317
    """
318
    review_history = getReviewHistory(obj)
319
    for event in review_history:
320
        if event.get('action') == action_id:
321
            evtime = event.get('time')
322
            if return_as_datetime:
323
                return evtime
324
            if evtime:
325
                value = ulocalized_time(evtime, long_format=True,
326
                                        time_only=False, context=obj)
327
                return value
328
    return None
329
330
331
def getTransitionUsers(obj, action_id, last_user=False):
332
    """
333
    This function returns a list with the users who have done the transition.
334
    :action_id: a sring as the transition id.
335
    :last_user: a boolean to return only the last user triggering the
336
        transition or all of them.
337
    :returns: a list of user ids.
338
    """
339
    workflow = getToolByName(obj, 'portal_workflow')
340
    users = []
341
    try:
342
        # https://jira.bikalabs.com/browse/LIMS-2242:
343
        # Sometimes the workflow history is inexplicably missing!
344
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
345
    except WorkflowException:
346
        logger.error(
347
            "workflow history is inexplicably missing."
348
            " https://jira.bikalabs.com/browse/LIMS-2242")
349
        return users
350
    # invert the list, so we always see the most recent matching event
351
    review_history.reverse()
352
    for event in review_history:
353
        if event.get('action', '') == action_id:
354
            value = event.get('actor', '')
355
            users.append(value)
356
            if last_user:
357
                return users
358
    return users
359
360
361
def guard_handler(instance, transition_id):
362
    """Generic workflow guard handler that returns true if the transition_id
363
    passed in can be performed to the instance passed in.
364
365
    This function is called automatically by a Script (Python) located at
366
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
367
    expression like "python:here.guard_handler('<transition_id>')" is set to
368
    any given guard (used by default in all bika's DC Workflow guards).
369
370
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
371
    that matches with 'guard_<transition_id>'. If found, calls the function and
372
    returns its value (true or false). If not found, returns True by default.
373
374
    :param instance: the object for which the transition_id has to be evaluated
375
    :param transition_id: the id of the transition
376
    :type instance: ATContentType
377
    :type transition_id: string
378
    :return: true if the transition can be performed to the passed in instance
379
    :rtype: bool
380
    """
381
    if not instance:
382
        return True
383
    clazz_name = instance.portal_type
384
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
385
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
386
    if not wf_module:
387
        return True
388
389
    # Inspect if guard_<transition_id> function exists in the above module
390
    key = 'guard_{0}'.format(transition_id)
391
    guard = getattr(wf_module, key, False)
392
    if not guard:
393
        return True
394
395
    #logger.info('{0}.guards.{1}'.format(clazz_name.lower(), key))
396
    return guard(instance)
397
398
399
def _load_wf_module(module_relative_name):
400
    """Loads a python module based on the module relative name passed in.
401
402
    At first, tries to get the module from sys.modules. If not found there, the
403
    function tries to load it by using importlib. Returns None if no module
404
    found or importlib is unable to load it because of errors.
405
    Eg:
406
        _load_wf_module('sample.events')
407
408
    will try to load the module 'bika.lims.workflow.sample.events'
409
410
    :param modrelname: relative name of the module to be loaded
411
    :type modrelname: string
412
    :return: the module
413
    :rtype: module
414
    """
415
    if not module_relative_name:
416
        return None
417
    if not isinstance(module_relative_name, basestring):
418
        return None
419
420
    rootmodname = __name__
421
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
422
    if modulekey in sys.modules:
423
        return sys.modules.get(modulekey, None)
424
425
    # Try to load the module recursively
426
    modname = None
427
    tokens = module_relative_name.split('.')
428
    for part in tokens:
429
        modname = '.'.join([modname, part]) if modname else part
430
        import importlib
431
        try:
432
            _module = importlib.import_module('.'+modname, package=rootmodname)
433
            if not _module:
434
                return None
435
        except Exception:
436
            return None
437
    return sys.modules.get(modulekey, None)
438
439
440
class JSONReadExtender(object):
441
442
    """- Adds the list of possible transitions to each object, if 'transitions'
443
    is specified in the include_fields.
444
    """
445
446
    implements(IJSONReadExtender)
447
448
    def __init__(self, context):
449
        self.context = context
450
451
    def __call__(self, request, data):
452
        include_fields = get_include_fields(request)
453
        if not include_fields or "transitions" in include_fields:
454
            data['transitions'] = get_workflow_actions(self.context)
455
456
457
# TODO Workflow - ActionsPool - Better use ActionHandlerPool
458
class ActionsPool(object):
459
    """Handles transitions of multiple objects at once
460
    """
461
    def __init__(self):
462
        self.actions_pool = collections.OrderedDict()
463
464
    def add(self, instance, action_id):
465
        uid = api.get_uid(instance)
466
        self.actions_pool[uid] = {"instance": instance,
467
                                  "action_id": action_id}
468
469
    def resume(self):
470
        action_handler = ActionHandlerPool.get_instance()
471
        action_handler.queue_pool()
472
        outcome = collections.OrderedDict()
473
        for uid, values in self.actions_pool.items():
474
            instance = values["instance"]
475
            action_id = values["action_id"]
476
            outcome[uid] = doActionFor(instance, action_id)
477
        self.actions_pool = collections.OrderedDict()
478
        action_handler.resume()
479
        return outcome
480
481
482
class ActionHandlerPool(object):
483
    """Singleton to handle concurrent transitions
484
    """
485
    __instance = None
486
487
    @staticmethod
488
    def get_instance():
489
        """Returns the current instance of ActionHandlerPool
490
        """
491
        if ActionHandlerPool.__instance == None:
492
            ActionHandlerPool()
493
        return ActionHandlerPool.__instance
494
495
    def __init__(self):
496
        if ActionHandlerPool.__instance != None:
497
            raise Exception("Use ActionHandlerPool.get_instance()")
498
        self.objects = collections.OrderedDict()
499
        self.num_calls = 0
500
        ActionHandlerPool.__instance = self
501
502
    def __len__(self):
503
        """Number of objects in the pool
504
        """
505
        return len(self.objects)
506
507
    def queue_pool(self):
508
        """Notifies that a new batch of jobs is about to begin
509
        """
510
        self.num_calls += 1
511
512
    def push(self, instance, action, success, idxs=_marker):
513
        """Adds an instance into the pool, to be reindexed on resume
514
        """
515
        uid = api.get_uid(instance)
516
        info = self.objects.get(uid, {})
517
        idx = [] if idxs is _marker else idxs
518
        info[action] = {'success': success, 'idxs': idx}
519
        self.objects[uid] = info
520
521
    def succeed(self, instance, action):
522
        """Returns if the task for the instance took place successfully
523
        """
524
        uid = api.get_uid(instance)
525
        return self.objects.get(uid, {}).get(action, {}).get('success', False)
526
527
    def resume(self):
528
        """Resumes the pool and reindex all objects processed
529
        """
530
        self.num_calls -= 1
531
        if self.num_calls > 0:
532
            return
533
        logger.info("Resume actions for {} objects".format(len(self)))
534
        processed = list()
535
        for uid, info in self.objects.items():
536
            if uid in processed:
537
                continue
538
539
            idxs = self.get_indexes(uid)
540
            idxs_str = idxs and ', '.join(idxs) or "-- All indexes --"
541
            instance = api.get_object_by_uid(uid, default=None)
542
            if instance:
543
                logger.info(
544
                    "Reindexing {}: {}".format(instance.getId(), idxs_str))
545
                instance.reindexObject(idxs=self.get_indexes(uid))
546
            processed.append(uid)
547
        logger.info("Objects processed: {}".format(len(processed)))
548
        self.objects = collections.OrderedDict()
549
550
    def get_indexes(self, uid):
551
        """Returns the names of the indexes to be reindexed for the object with
552
        the uid passed in. If no indexes for this object have been specified
553
        within the action pool job, returns an empty list (reindex all).
554
        Otherwise, return all the indexes that have been specified for the
555
        object within the action pool job.
556
        """
557
        idxs = []
558
        info = self.objects.get(uid, {})
559
        for action_id, value in info.items():
560
            obj_idxs = value.get('idxs', None)
561
            if obj_idxs is None:
562
                # Don't reindex!
563
                continue
564
            elif len(obj_idxs) == 0:
565
                # Reindex all indexes!
566
                return []
567
            idxs.extend(obj_idxs)
568
        # Always reindex review_state and is_active
569
        idxs.extend(["review_state", "is_active"])
570
        return list(set(idxs))
571
572
573
def push_reindex_to_actions_pool(obj, idxs=None):
574
    """Push a reindex job to the actions handler pool
575
    """
576
    indexes = idxs and idxs or []
577
    pool = ActionHandlerPool.get_instance()
578
    pool.push(obj, "reindex", success=True, idxs=indexes)
579