Passed
Push — master ( d6b140...72cad7 )
by Ramon
05:21
created

build.bika.lims.workflow.wasTransitionPerformed()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import collections
22
import sys
23
24
from AccessControl.SecurityInfo import ModuleSecurityInfo
25
from Products.Archetypes.config import UID_CATALOG
26
from Products.CMFCore.WorkflowCore import WorkflowException
27
from Products.CMFCore.utils import getToolByName
28
from bika.lims import PMF
29
from bika.lims import api
30
from bika.lims import logger
31
from bika.lims.browser import ulocalized_time
32
from bika.lims.interfaces import IJSONReadExtender
33
from bika.lims.jsonapi import get_include_fields
34
from bika.lims.utils import changeWorkflowState
35
from bika.lims.utils import t
36
from bika.lims.workflow.indexes import ACTIONS_TO_INDEXES
37
from zope.interface import implements
38
39
security = ModuleSecurityInfo('bika.lims.workflow')
40
security.declarePublic('guard_handler')
41
42
_marker = object()
43
44
def skip(instance, action, peek=False, unskip=False):
45
    """Returns True if the transition is to be SKIPPED
46
47
        peek - True just checks the value, does not set.
48
        unskip - remove skip key (for manual overrides).
49
50
    called with only (instance, action_id), this will set the request variable preventing the
51
    cascade's from re-transitioning the object and return None.
52
    """
53
54
    uid = callable(instance.UID) and instance.UID() or instance.UID
55
    skipkey = "%s_%s" % (uid, action)
56
    if 'workflow_skiplist' not in instance.REQUEST:
57
        if not peek and not unskip:
58
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
59
    else:
60
        if skipkey in instance.REQUEST['workflow_skiplist']:
61
            if unskip:
62
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
63
            else:
64
                return True
65
        else:
66
            if not peek and not unskip:
67
                instance.REQUEST["workflow_skiplist"].append(skipkey)
68
69
70
def doActionFor(instance, action_id, idxs=None):
71
    """Tries to perform the transition to the instance.
72
    Object is reindexed after the transition takes place, but only if succeeds.
73
    If idxs is set, only these indexes will be reindexed. Otherwise, will try
74
    to use the indexes defined in ACTIONS_TO_INDEX mapping if any.
75
    :param instance: Object to be transitioned
76
    :param action_id: transition id
77
    :param idxs: indexes to be reindexed after the transition
78
    :returns: True if the transition has been performed, together with message
79
    :rtype: tuple (bool,str)
80
    """
81
    if not instance:
82
        return False, ""
83
84
    if isinstance(instance, list):
85
        # TODO Workflow . Check if this is strictly necessary
86
        # This check is here because sometimes Plone creates a list
87
        # from submitted form elements.
88
        logger.warn("Got a list of obj in doActionFor!")
89
        if len(instance) > 1:
90
            logger.warn(
91
                "doActionFor is getting an instance parameter which is a list "
92
                "with more than one item. Instance: '{}', action_id: '{}'"
93
                .format(instance, action_id)
94
            )
95
96
        return doActionFor(instance=instance[0], action_id=action_id, idxs=idxs)
97
98
    # Since a given transition can cascade or promote to other objects, we want
99
    # to reindex all objects for which the transition succeed at once, at the
100
    # end of process. Otherwise, same object will be reindexed multiple times
101
    # unnecessarily. Also, ActionsHandlerPool ensures the same transition is not
102
    # applied twice to the same object due to cascade/promote recursions.
103
    pool = ActionHandlerPool.get_instance()
104
    if pool.succeed(instance, action_id):
105
        return False, "Transition {} for {} already done"\
106
             .format(action_id, instance.getId())
107
108
    # Return False if transition is not permitted
109
    if not isTransitionAllowed(instance, action_id):
110
        return False, "Transition {} for {} is not allowed"\
111
            .format(action_id, instance.getId())
112
113
    # Add this batch process to the queue
114
    pool.queue_pool()
115
    succeed = False
116
    message = ""
117
    workflow = getToolByName(instance, "portal_workflow")
118
    try:
119
        workflow.doActionFor(instance, action_id)
120
        succeed = True
121
    except WorkflowException as e:
122
        message = str(e)
123
        curr_state = getCurrentState(instance)
124
        clazz_name = instance.__class__.__name__
125
        logger.warning(
126
            "Transition '{0}' not allowed: {1} '{2}' ({3})"\
127
            .format(action_id, clazz_name, instance.getId(), curr_state))
128
        logger.error(message)
129
130
    # If no indexes to reindex have been defined, try to use those defined in
131
    # the ACTIONS_TO_INDEXES mapping. Reindexing only those indexes that might
132
    # be affected by the transition boosts the overall performance!.
133
    if idxs is None:
134
        portal_type = instance.portal_type
135
        idxs = ACTIONS_TO_INDEXES.get(portal_type, {}).get(action_id, [])
136
137
    # Add the current object to the pool and resume
138
    pool.push(instance, action_id, succeed, idxs=idxs)
139
    pool.resume()
140
141
    return succeed, message
142
143
144
def call_workflow_event(instance, event, after=True):
145
    """Calls the instance's workflow event
146
    """
147
    if not event.transition:
148
        return False
149
150
    portal_type = instance.portal_type
151
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
152
    if not wf_module:
153
        return False
154
155
    # Inspect if event_<transition_id> function exists in the module
156
    prefix = after and "after" or "before"
157
    func_name = "{}_{}".format(prefix, event.transition.id)
158
    func = getattr(wf_module, func_name, False)
159
    if not func:
160
        return False
161
162
    logger.info('WF event: {0}.events.{1}'
163
                .format(portal_type.lower(), func_name))
164
    func(instance)
165
    return True
166
167
168
def BeforeTransitionEventHandler(instance, event):
169
    """ This event is executed before each transition and delegates further
170
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
171
    if exists for the instance passed in.
172
    :param instance: the instance to be transitioned
173
    :type instance: ATContentType
174
    :param event: event that holds the transition to be performed
175
    :type event: IObjectEvent
176
    """
177
    call_workflow_event(instance, event, after=False)
178
179
180
def AfterTransitionEventHandler(instance, event):
181
    """ This event is executed after each transition and delegates further
182
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
183
    if exists for the instance passed in.
184
    :param instance: the instance that has been transitioned
185
    :type instance: ATContentType
186
    :param event: event that holds the transition performed
187
    :type event: IObjectEvent
188
    """
189
    if call_workflow_event(instance, event, after=True):
190
        return
191
192
    # Try with old AfterTransitionHandler dance...
193
    # TODO CODE TO BE REMOVED AFTER PORTING workflow_script_*/*_transition_event
194
    if not event.transition:
195
        return
196
    # Set the request variable preventing cascade's from re-transitioning.
197
    if skip(instance, event.transition.id):
198
        return
199
    # Because at this point, the object has been transitioned already, but
200
    # further actions are probably needed still, so be sure is reindexed
201
    # before going forward.
202
    instance.reindexObject()
203
    key = 'after_{0}_transition_event'.format(event.transition.id)
204
    after_event = getattr(instance, key, False)
205
    if not after_event:
206
        # TODO Workflow. this conditional is only for backwards compatibility,
207
        # to be removed when all workflow_script_* methods in contents are
208
        # replaced by the more explicity signature 'after_*_transition_event'
209
        key = 'workflow_script_' + event.transition.id
210
        after_event = getattr(instance, key, False)
211
    if not after_event:
212
        return
213
    after_event()
214
215
216
def get_workflow_actions(obj):
217
    """ Compile a list of possible workflow transitions for this object
218
    """
219
220
    def translate(id):
221
        return t(PMF(id + "_transition_title"))
222
223
    transids = getAllowedTransitions(obj)
224
    actions = [{'id': it, 'title': translate(it)} for it in transids]
225
    return actions
226
227
228
def isTransitionAllowed(instance, transition_id):
229
    """Checks if the object can perform the transition passed in.
230
    :returns: True if transition can be performed
231
    :rtype: bool
232
    """
233
    wf_tool = getToolByName(instance, "portal_workflow")
234
    for wf_id in wf_tool.getChainFor(instance):
235
        wf = wf_tool.getWorkflowById(wf_id)
236
        if wf and wf.isActionSupported(instance, transition_id):
237
            return True
238
239
    return False
240
241
242
def getAllowedTransitions(instance):
243
    """Returns a list with the transition ids that can be performed against
244
    the instance passed in.
245
    :param instance: A content object
246
    :type instance: ATContentType
247
    :returns: A list of transition/action ids
248
    :rtype: list
249
    """
250
    wftool = getToolByName(instance, "portal_workflow")
251
    transitions = wftool.getTransitionsFor(instance)
252
    return [trans['id'] for trans in transitions]
253
254
255
def get_review_history_statuses(instance, reverse=False):
256
    """Returns a list with the statuses of the instance from the review_history
257
    """
258
    review_history = getReviewHistory(instance, reverse=reverse)
259
    return map(lambda event: event["review_state"], review_history)
260
261
262
def get_prev_status_from_history(instance, status=None):
263
    """Returns the previous status of the object. If status is set, returns the
264
    previous status before the object reached the status passed in.
265
    If instance has reached the status passed in more than once, only the last
266
    one is considered.
267
    """
268
    target = status or api.get_workflow_status_of(instance)
269
    history = getReviewHistory(instance, reverse=True)
270
    history = map(lambda event: event["review_state"], history)
271
    if target not in history or history.index(target) == len(history)-1:
272
        return None
273
    return history[history.index(target)+1]
274
275
276
def getReviewHistory(instance, reverse=True):
277
    """Returns the review history for the instance
278
    :returns: the list of historic events as dicts
279
    """
280
    return api.get_review_history(instance, rev=reverse)
281
282
283
def getCurrentState(obj, stateflowid='review_state'):
284
    """ The current state of the object for the state flow id specified
285
        Return empty if there's no workflow state for the object and flow id
286
    """
287
    return api.get_workflow_status_of(obj, stateflowid)
288
289
290
def in_state(obj, states, stateflowid='review_state'):
291
    """ Returns if the object passed matches with the states passed in
292
    """
293
    if not states:
294
        return False
295
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
296
    return obj_state in states
297
298
299
def getTransitionActor(obj, action_id):
300
    """Returns the actor that performed a given transition. If transition has
301
    not been perormed, or current user has no privileges, returns None
302
    :return: the username of the user that performed the transition passed-in
303
    :type: string
304
    """
305
    review_history = getReviewHistory(obj)
306
    for event in review_history:
307
        if event.get('action') == action_id:
308
            return event.get('actor')
309
    return None
310
311
312
def getTransitionDate(obj, action_id, return_as_datetime=False):
313
    """
314
    Returns date of action for object. Sometimes we need this date in Datetime
315
    format and that's why added return_as_datetime param.
316
    """
317
    review_history = getReviewHistory(obj)
318
    for event in review_history:
319
        if event.get('action') == action_id:
320
            evtime = event.get('time')
321
            if return_as_datetime:
322
                return evtime
323
            if evtime:
324
                value = ulocalized_time(evtime, long_format=True,
325
                                        time_only=False, context=obj)
326
                return value
327
    return None
328
329
330
def getTransitionUsers(obj, action_id, last_user=False):
331
    """
332
    This function returns a list with the users who have done the transition.
333
    :action_id: a sring as the transition id.
334
    :last_user: a boolean to return only the last user triggering the
335
        transition or all of them.
336
    :returns: a list of user ids.
337
    """
338
    workflow = getToolByName(obj, 'portal_workflow')
339
    users = []
340
    try:
341
        # https://jira.bikalabs.com/browse/LIMS-2242:
342
        # Sometimes the workflow history is inexplicably missing!
343
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
344
    except WorkflowException:
345
        logger.error(
346
            "workflow history is inexplicably missing."
347
            " https://jira.bikalabs.com/browse/LIMS-2242")
348
        return users
349
    # invert the list, so we always see the most recent matching event
350
    review_history.reverse()
351
    for event in review_history:
352
        if event.get('action', '') == action_id:
353
            value = event.get('actor', '')
354
            users.append(value)
355
            if last_user:
356
                return users
357
    return users
358
359
360
def guard_handler(instance, transition_id):
361
    """Generic workflow guard handler that returns true if the transition_id
362
    passed in can be performed to the instance passed in.
363
364
    This function is called automatically by a Script (Python) located at
365
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
366
    expression like "python:here.guard_handler('<transition_id>')" is set to
367
    any given guard (used by default in all bika's DC Workflow guards).
368
369
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
370
    that matches with 'guard_<transition_id>'. If found, calls the function and
371
    returns its value (true or false). If not found, returns True by default.
372
373
    :param instance: the object for which the transition_id has to be evaluated
374
    :param transition_id: the id of the transition
375
    :type instance: ATContentType
376
    :type transition_id: string
377
    :return: true if the transition can be performed to the passed in instance
378
    :rtype: bool
379
    """
380
    if not instance:
381
        return True
382
    clazz_name = instance.portal_type
383
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
384
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
385
    if not wf_module:
386
        return True
387
388
    # Inspect if guard_<transition_id> function exists in the above module
389
    key = 'guard_{0}'.format(transition_id)
390
    guard = getattr(wf_module, key, False)
391
    if not guard:
392
        return True
393
394
    #logger.info('{0}.guards.{1}'.format(clazz_name.lower(), key))
395
    return guard(instance)
396
397
398
def _load_wf_module(module_relative_name):
399
    """Loads a python module based on the module relative name passed in.
400
401
    At first, tries to get the module from sys.modules. If not found there, the
402
    function tries to load it by using importlib. Returns None if no module
403
    found or importlib is unable to load it because of errors.
404
    Eg:
405
        _load_wf_module('sample.events')
406
407
    will try to load the module 'bika.lims.workflow.sample.events'
408
409
    :param modrelname: relative name of the module to be loaded
410
    :type modrelname: string
411
    :return: the module
412
    :rtype: module
413
    """
414
    if not module_relative_name:
415
        return None
416
    if not isinstance(module_relative_name, basestring):
417
        return None
418
419
    rootmodname = __name__
420
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
421
    if modulekey in sys.modules:
422
        return sys.modules.get(modulekey, None)
423
424
    # Try to load the module recursively
425
    modname = None
426
    tokens = module_relative_name.split('.')
427
    for part in tokens:
428
        modname = '.'.join([modname, part]) if modname else part
429
        import importlib
430
        try:
431
            _module = importlib.import_module('.'+modname, package=rootmodname)
432
            if not _module:
433
                return None
434
        except Exception:
435
            return None
436
    return sys.modules.get(modulekey, None)
437
438
439
class JSONReadExtender(object):
440
441
    """- Adds the list of possible transitions to each object, if 'transitions'
442
    is specified in the include_fields.
443
    """
444
445
    implements(IJSONReadExtender)
446
447
    def __init__(self, context):
448
        self.context = context
449
450
    def __call__(self, request, data):
451
        include_fields = get_include_fields(request)
452
        if not include_fields or "transitions" in include_fields:
453
            data['transitions'] = get_workflow_actions(self.context)
454
455
456
class ActionHandlerPool(object):
457
    """Singleton to handle concurrent transitions
458
    """
459
    __instance = None
460
461
    @staticmethod
462
    def get_instance():
463
        """Returns the current instance of ActionHandlerPool
464
        """
465
        if ActionHandlerPool.__instance == None:
466
            ActionHandlerPool()
467
        return ActionHandlerPool.__instance
468
469
    def __init__(self):
470
        if ActionHandlerPool.__instance != None:
471
            raise Exception("Use ActionHandlerPool.get_instance()")
472
        self.objects = collections.OrderedDict()
473
        self.num_calls = 0
474
        ActionHandlerPool.__instance = self
475
476
    def __len__(self):
477
        """Number of objects in the pool
478
        """
479
        return len(self.objects)
480
481
    def queue_pool(self):
482
        """Notifies that a new batch of jobs is about to begin
483
        """
484
        self.num_calls += 1
485
486
    def push(self, instance, action, success, idxs=_marker):
487
        """Adds an instance into the pool, to be reindexed on resume
488
        """
489
        uid = api.get_uid(instance)
490
        info = self.objects.get(uid, {})
491
        idx = [] if idxs is _marker else idxs
492
        info[action] = {'success': success, 'idxs': idx}
493
        self.objects[uid] = info
494
495
    def succeed(self, instance, action):
496
        """Returns if the task for the instance took place successfully
497
        """
498
        uid = api.get_uid(instance)
499
        return self.objects.get(uid, {}).get(action, {}).get('success', False)
500
501
    def resume(self):
502
        """Resumes the pool and reindex all objects processed
503
        """
504
        self.num_calls -= 1
505
        if self.num_calls > 0:
506
            return
507
        logger.info("Resume actions for {} objects".format(len(self)))
508
509
        # Fetch the objects from the pool
510
        processed = list()
511
        for brain in api.search(dict(UID=self.objects.keys()), UID_CATALOG):
512
            uid = api.get_uid(brain)
513
            if uid in processed:
514
                # This object has been processed already, do nothing
515
                continue
516
517
            # Reindex the object
518
            obj = api.get_object(brain)
519
            idxs = self.get_indexes(uid)
520
            idxs_str = idxs and ', '.join(idxs) or "-- All indexes --"
521
            logger.info("Reindexing {}: {}".format(obj.getId(), idxs_str))
522
            obj.reindexObject(idxs=idxs)
523
            processed.append(uid)
524
525
        # Cleanup the pool
526
        logger.info("Objects processed: {}".format(len(processed)))
527
        self.objects = collections.OrderedDict()
528
529
    def get_indexes(self, uid):
530
        """Returns the names of the indexes to be reindexed for the object with
531
        the uid passed in. If no indexes for this object have been specified
532
        within the action pool job, returns an empty list (reindex all).
533
        Otherwise, return all the indexes that have been specified for the
534
        object within the action pool job.
535
        """
536
        idxs = []
537
        info = self.objects.get(uid, {})
538
        for action_id, value in info.items():
539
            obj_idxs = value.get('idxs', None)
540
            if obj_idxs is None:
541
                # Don't reindex!
542
                continue
543
            elif len(obj_idxs) == 0:
544
                # Reindex all indexes!
545
                return []
546
            idxs.extend(obj_idxs)
547
        # Always reindex review_state and is_active
548
        idxs.extend(["review_state", "is_active"])
549
        return list(set(idxs))
550
551
552
def push_reindex_to_actions_pool(obj, idxs=None):
553
    """Push a reindex job to the actions handler pool
554
    """
555
    indexes = idxs and idxs or []
556
    pool = ActionHandlerPool.get_instance()
557
    pool.push(obj, "reindex", success=True, idxs=indexes)
558