Passed
Push — master ( d8e2ec...90ae0b )
by Jordi
10:07 queued 04:19
created

ActionHandlerPool.__len__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE
4
#
5
# Copyright 2018 by it's authors.
6
# Some rights reserved. See LICENSE.rst, CONTRIBUTORS.rst.
7
8
import collections
9
import sys
10
11
from AccessControl.SecurityInfo import ModuleSecurityInfo
12
from Products.Archetypes.config import UID_CATALOG
13
from Products.CMFCore.WorkflowCore import WorkflowException
14
from Products.CMFCore.utils import getToolByName
15
from bika.lims import PMF
16
from bika.lims import api
17
from bika.lims import logger
18
from bika.lims.browser import ulocalized_time
19
from bika.lims.interfaces import IJSONReadExtender
20
from bika.lims.jsonapi import get_include_fields
21
from bika.lims.utils import changeWorkflowState
22
from bika.lims.utils import t
23
from bika.lims.workflow.indexes import ACTIONS_TO_INDEXES
24
from zope.interface import implements
25
26
security = ModuleSecurityInfo('bika.lims.workflow')
27
security.declarePublic('guard_handler')
28
29
_marker = object()
30
31
def skip(instance, action, peek=False, unskip=False):
32
    """Returns True if the transition is to be SKIPPED
33
34
        peek - True just checks the value, does not set.
35
        unskip - remove skip key (for manual overrides).
36
37
    called with only (instance, action_id), this will set the request variable preventing the
38
    cascade's from re-transitioning the object and return None.
39
    """
40
41
    uid = callable(instance.UID) and instance.UID() or instance.UID
42
    skipkey = "%s_%s" % (uid, action)
43
    if 'workflow_skiplist' not in instance.REQUEST:
44
        if not peek and not unskip:
45
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
46
    else:
47
        if skipkey in instance.REQUEST['workflow_skiplist']:
48
            if unskip:
49
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
50
            else:
51
                return True
52
        else:
53
            if not peek and not unskip:
54
                instance.REQUEST["workflow_skiplist"].append(skipkey)
55
56
57
def doActionFor(instance, action_id, idxs=None):
58
    """Tries to perform the transition to the instance.
59
    Object is reindexed after the transition takes place, but only if succeeds.
60
    If idxs is set, only these indexes will be reindexed. Otherwise, will try
61
    to use the indexes defined in ACTIONS_TO_INDEX mapping if any.
62
    :param instance: Object to be transitioned
63
    :param action_id: transition id
64
    :param idxs: indexes to be reindexed after the transition
65
    :returns: True if the transition has been performed, together with message
66
    :rtype: tuple (bool,str)
67
    """
68
    if not instance:
69
        return False, ""
70
71
    if isinstance(instance, list):
72
        # TODO Workflow . Check if this is strictly necessary
73
        # This check is here because sometimes Plone creates a list
74
        # from submitted form elements.
75
        logger.warn("Got a list of obj in doActionFor!")
76
        if len(instance) > 1:
77
            logger.warn(
78
                "doActionFor is getting an instance parameter which is a list "
79
                "with more than one item. Instance: '{}', action_id: '{}'"
80
                .format(instance, action_id)
81
            )
82
83
        return doActionFor(instance=instance[0], action_id=action_id, idxs=idxs)
84
85
    # Since a given transition can cascade or promote to other objects, we want
86
    # to reindex all objects for which the transition succeed at once, at the
87
    # end of process. Otherwise, same object will be reindexed multiple times
88
    # unnecessarily. Also, ActionsHandlerPool ensures the same transition is not
89
    # applied twice to the same object due to cascade/promote recursions.
90
    pool = ActionHandlerPool.get_instance()
91
    if pool.succeed(instance, action_id):
92
        return False, "Transition {} for {} already done"\
93
             .format(action_id, instance.getId())
94
95
    # Return False if transition is not permitted
96
    if not isTransitionAllowed(instance, action_id):
97
        return False, "Transition {} for {} is not allowed"\
98
            .format(action_id, instance.getId())
99
100
    # Add this batch process to the queue
101
    pool.queue_pool()
102
    succeed = False
103
    message = ""
104
    workflow = getToolByName(instance, "portal_workflow")
105
    try:
106
        workflow.doActionFor(instance, action_id)
107
        succeed = True
108
    except WorkflowException as e:
109
        message = str(e)
110
        curr_state = getCurrentState(instance)
111
        clazz_name = instance.__class__.__name__
112
        logger.warning(
113
            "Transition '{0}' not allowed: {1} '{2}' ({3})"\
114
            .format(action_id, clazz_name, instance.getId(), curr_state))
115
        logger.error(message)
116
117
    # If no indexes to reindex have been defined, try to use those defined in
118
    # the ACTIONS_TO_INDEXES mapping. Reindexing only those indexes that might
119
    # be affected by the transition boosts the overall performance!.
120
    if idxs is None:
121
        portal_type = instance.portal_type
122
        idxs = ACTIONS_TO_INDEXES.get(portal_type, {}).get(action_id, [])
123
124
    # Add the current object to the pool and resume
125
    pool.push(instance, action_id, succeed, idxs=idxs)
126
    pool.resume()
127
128
    return succeed, message
129
130
131
def call_workflow_event(instance, event, after=True):
132
    """Calls the instance's workflow event
133
    """
134
    if not event.transition:
135
        return False
136
137
    portal_type = instance.portal_type
138
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
139
    if not wf_module:
140
        return False
141
142
    # Inspect if event_<transition_id> function exists in the module
143
    prefix = after and "after" or "before"
144
    func_name = "{}_{}".format(prefix, event.transition.id)
145
    func = getattr(wf_module, func_name, False)
146
    if not func:
147
        return False
148
149
    logger.info('WF event: {0}.events.{1}'
150
                .format(portal_type.lower(), func_name))
151
    func(instance)
152
    return True
153
154
155
def BeforeTransitionEventHandler(instance, event):
156
    """ This event is executed before each transition and delegates further
157
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
158
    if exists for the instance passed in.
159
    :param instance: the instance to be transitioned
160
    :type instance: ATContentType
161
    :param event: event that holds the transition to be performed
162
    :type event: IObjectEvent
163
    """
164
    call_workflow_event(instance, event, after=False)
165
166
167
def AfterTransitionEventHandler(instance, event):
168
    """ This event is executed after each transition and delegates further
169
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
170
    if exists for the instance passed in.
171
    :param instance: the instance that has been transitioned
172
    :type instance: ATContentType
173
    :param event: event that holds the transition performed
174
    :type event: IObjectEvent
175
    """
176
    if call_workflow_event(instance, event, after=True):
177
        return
178
179
    # Try with old AfterTransitionHandler dance...
180
    # TODO CODE TO BE REMOVED AFTER PORTING workflow_script_*/*_transition_event
181
    if not event.transition:
182
        return
183
    # Set the request variable preventing cascade's from re-transitioning.
184
    if skip(instance, event.transition.id):
185
        return
186
    # Because at this point, the object has been transitioned already, but
187
    # further actions are probably needed still, so be sure is reindexed
188
    # before going forward.
189
    instance.reindexObject()
190
    key = 'after_{0}_transition_event'.format(event.transition.id)
191
    after_event = getattr(instance, key, False)
192
    if not after_event:
193
        # TODO Workflow. this conditional is only for backwards compatibility,
194
        # to be removed when all workflow_script_* methods in contents are
195
        # replaced by the more explicity signature 'after_*_transition_event'
196
        key = 'workflow_script_' + event.transition.id
197
        after_event = getattr(instance, key, False)
198
    if not after_event:
199
        return
200
    after_event()
201
202
203
def get_workflow_actions(obj):
204
    """ Compile a list of possible workflow transitions for this object
205
    """
206
207
    def translate(id):
208
        return t(PMF(id + "_transition_title"))
209
210
    transids = getAllowedTransitions(obj)
211
    actions = [{'id': it, 'title': translate(it)} for it in transids]
212
    return actions
213
214
215
def isTransitionAllowed(instance, transition_id):
216
    """Checks if the object can perform the transition passed in.
217
    :returns: True if transition can be performed
218
    :rtype: bool
219
    """
220
    wf_tool = getToolByName(instance, "portal_workflow")
221
    for wf_id in wf_tool.getChainFor(instance):
222
        wf = wf_tool.getWorkflowById(wf_id)
223
        if wf and wf.isActionSupported(instance, transition_id):
224
            return True
225
226
    return False
227
228
229
def getAllowedTransitions(instance):
230
    """Returns a list with the transition ids that can be performed against
231
    the instance passed in.
232
    :param instance: A content object
233
    :type instance: ATContentType
234
    :returns: A list of transition/action ids
235
    :rtype: list
236
    """
237
    wftool = getToolByName(instance, "portal_workflow")
238
    transitions = wftool.getTransitionsFor(instance)
239
    return [trans['id'] for trans in transitions]
240
241
242
def wasTransitionPerformed(instance, transition_id):
243
    """Checks if the transition has already been performed to the object
244
    Instance's workflow history is checked.
245
    """
246
    transitions = getReviewHistoryActionsList(instance)
247
    return transition_id in transitions
248
249
250
def getReviewHistoryActionsList(instance, reverse=False):
251
    """Returns a list with the actions performed to the instance
252
    """
253
    review_history = getReviewHistory(instance, reverse=reverse)
254
    return map(lambda event: event["action"], review_history)
255
256
257
def get_prev_status_from_history(instance, status=None):
258
    """Returns the previous status of the object. If status is set, returns the
259
    previous status before the object reached the status passed in.
260
    If instance has reached the status passed in more than once, only the last
261
    one is considered.
262
    """
263
    target = status or api.get_workflow_status_of(instance)
264
    history = getReviewHistory(instance, reverse=True)
265
    history = map(lambda event: event["review_state"], history)
266
    if target not in history or history.index(target) == len(history)-1:
267
        return None
268
    return history[history.index(target)+1]
269
270
271
def getReviewHistory(instance, reverse=True):
272
    """Returns the review history for the instance
273
    :returns: the list of historic events as dicts
274
    """
275
    review_history = []
276
    workflow = getToolByName(instance, 'portal_workflow')
277
    try:
278
        review_history = list(workflow.getInfoFor(instance, 'review_history'))
279
    except WorkflowException:
280
        logger.error("Unable to retrieve review history from {}:{}"
281
                     .format(instance.portal_type, instance.getId()))
282
    if reverse:
283
        # invert the list, so we always see the most recent matching event
284
        review_history.reverse()
285
    return review_history
286
287
def getCurrentState(obj, stateflowid='review_state'):
288
    """ The current state of the object for the state flow id specified
289
        Return empty if there's no workflow state for the object and flow id
290
    """
291
    return api.get_workflow_status_of(obj, stateflowid)
292
293
294
def in_state(obj, states, stateflowid='review_state'):
295
    """ Returns if the object passed matches with the states passed in
296
    """
297
    if not states:
298
        return False
299
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
300
    return obj_state in states
301
302
303
def getTransitionActor(obj, action_id):
304
    """Returns the actor that performed a given transition. If transition has
305
    not been perormed, or current user has no privileges, returns None
306
    :return: the username of the user that performed the transition passed-in
307
    :type: string
308
    """
309
    review_history = getReviewHistory(obj)
310
    for event in review_history:
311
        if event.get('action') == action_id:
312
            return event.get('actor')
313
    return None
314
315
316
def getTransitionDate(obj, action_id, return_as_datetime=False):
317
    """
318
    Returns date of action for object. Sometimes we need this date in Datetime
319
    format and that's why added return_as_datetime param.
320
    """
321
    review_history = getReviewHistory(obj)
322
    for event in review_history:
323
        if event.get('action') == action_id:
324
            evtime = event.get('time')
325
            if return_as_datetime:
326
                return evtime
327
            if evtime:
328
                value = ulocalized_time(evtime, long_format=True,
329
                                        time_only=False, context=obj)
330
                return value
331
    return None
332
333
334
def getTransitionUsers(obj, action_id, last_user=False):
335
    """
336
    This function returns a list with the users who have done the transition.
337
    :action_id: a sring as the transition id.
338
    :last_user: a boolean to return only the last user triggering the
339
        transition or all of them.
340
    :returns: a list of user ids.
341
    """
342
    workflow = getToolByName(obj, 'portal_workflow')
343
    users = []
344
    try:
345
        # https://jira.bikalabs.com/browse/LIMS-2242:
346
        # Sometimes the workflow history is inexplicably missing!
347
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
348
    except WorkflowException:
349
        logger.error(
350
            "workflow history is inexplicably missing."
351
            " https://jira.bikalabs.com/browse/LIMS-2242")
352
        return users
353
    # invert the list, so we always see the most recent matching event
354
    review_history.reverse()
355
    for event in review_history:
356
        if event.get('action', '') == action_id:
357
            value = event.get('actor', '')
358
            users.append(value)
359
            if last_user:
360
                return users
361
    return users
362
363
364
def guard_handler(instance, transition_id):
365
    """Generic workflow guard handler that returns true if the transition_id
366
    passed in can be performed to the instance passed in.
367
368
    This function is called automatically by a Script (Python) located at
369
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
370
    expression like "python:here.guard_handler('<transition_id>')" is set to
371
    any given guard (used by default in all bika's DC Workflow guards).
372
373
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
374
    that matches with 'guard_<transition_id>'. If found, calls the function and
375
    returns its value (true or false). If not found, returns True by default.
376
377
    :param instance: the object for which the transition_id has to be evaluated
378
    :param transition_id: the id of the transition
379
    :type instance: ATContentType
380
    :type transition_id: string
381
    :return: true if the transition can be performed to the passed in instance
382
    :rtype: bool
383
    """
384
    if not instance:
385
        return True
386
    clazz_name = instance.portal_type
387
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
388
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
389
    if not wf_module:
390
        return True
391
392
    # Inspect if guard_<transition_id> function exists in the above module
393
    key = 'guard_{0}'.format(transition_id)
394
    guard = getattr(wf_module, key, False)
395
    if not guard:
396
        return True
397
398
    #logger.info('{0}.guards.{1}'.format(clazz_name.lower(), key))
399
    return guard(instance)
400
401
402
def _load_wf_module(module_relative_name):
403
    """Loads a python module based on the module relative name passed in.
404
405
    At first, tries to get the module from sys.modules. If not found there, the
406
    function tries to load it by using importlib. Returns None if no module
407
    found or importlib is unable to load it because of errors.
408
    Eg:
409
        _load_wf_module('sample.events')
410
411
    will try to load the module 'bika.lims.workflow.sample.events'
412
413
    :param modrelname: relative name of the module to be loaded
414
    :type modrelname: string
415
    :return: the module
416
    :rtype: module
417
    """
418
    if not module_relative_name:
419
        return None
420
    if not isinstance(module_relative_name, basestring):
421
        return None
422
423
    rootmodname = __name__
424
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
425
    if modulekey in sys.modules:
426
        return sys.modules.get(modulekey, None)
427
428
    # Try to load the module recursively
429
    modname = None
430
    tokens = module_relative_name.split('.')
431
    for part in tokens:
432
        modname = '.'.join([modname, part]) if modname else part
433
        import importlib
434
        try:
435
            _module = importlib.import_module('.'+modname, package=rootmodname)
436
            if not _module:
437
                return None
438
        except Exception:
439
            return None
440
    return sys.modules.get(modulekey, None)
441
442
443
class JSONReadExtender(object):
444
445
    """- Adds the list of possible transitions to each object, if 'transitions'
446
    is specified in the include_fields.
447
    """
448
449
    implements(IJSONReadExtender)
450
451
    def __init__(self, context):
452
        self.context = context
453
454
    def __call__(self, request, data):
455
        include_fields = get_include_fields(request)
456
        if not include_fields or "transitions" in include_fields:
457
            data['transitions'] = get_workflow_actions(self.context)
458
459
460
class ActionHandlerPool(object):
461
    """Singleton to handle concurrent transitions
462
    """
463
    __instance = None
464
465
    @staticmethod
466
    def get_instance():
467
        """Returns the current instance of ActionHandlerPool
468
        """
469
        if ActionHandlerPool.__instance == None:
470
            ActionHandlerPool()
471
        return ActionHandlerPool.__instance
472
473
    def __init__(self):
474
        if ActionHandlerPool.__instance != None:
475
            raise Exception("Use ActionHandlerPool.get_instance()")
476
        self.objects = collections.OrderedDict()
477
        self.num_calls = 0
478
        ActionHandlerPool.__instance = self
479
480
    def __len__(self):
481
        """Number of objects in the pool
482
        """
483
        return len(self.objects)
484
485
    def queue_pool(self):
486
        """Notifies that a new batch of jobs is about to begin
487
        """
488
        self.num_calls += 1
489
490
    def push(self, instance, action, success, idxs=_marker):
491
        """Adds an instance into the pool, to be reindexed on resume
492
        """
493
        uid = api.get_uid(instance)
494
        info = self.objects.get(uid, {})
495
        idx = [] if idxs is _marker else idxs
496
        info[action] = {'success': success, 'idxs': idx}
497
        self.objects[uid] = info
498
499
    def succeed(self, instance, action):
500
        """Returns if the task for the instance took place successfully
501
        """
502
        uid = api.get_uid(instance)
503
        return self.objects.get(uid, {}).get(action, {}).get('success', False)
504
505
    def resume(self):
506
        """Resumes the pool and reindex all objects processed
507
        """
508
        self.num_calls -= 1
509
        if self.num_calls > 0:
510
            return
511
        logger.info("Resume actions for {} objects".format(len(self)))
512
513
        # Fetch the objects from the pool
514
        processed = list()
515
        for brain in api.search(dict(UID=self.objects.keys()), UID_CATALOG):
516
            uid = api.get_uid(brain)
517
            if uid in processed:
518
                # This object has been processed already, do nothing
519
                continue
520
521
            # Reindex the object
522
            obj = api.get_object(brain)
523
            idxs = self.get_indexes(uid)
524
            idxs_str = idxs and ', '.join(idxs) or "-- All indexes --"
525
            logger.info("Reindexing {}: {}".format(obj.getId(), idxs_str))
526
            obj.reindexObject(idxs=idxs)
527
            processed.append(uid)
528
529
        # Cleanup the pool
530
        logger.info("Objects processed: {}".format(len(processed)))
531
        self.objects = collections.OrderedDict()
532
533
    def get_indexes(self, uid):
534
        """Returns the names of the indexes to be reindexed for the object with
535
        the uid passed in. If no indexes for this object have been specified
536
        within the action pool job, returns an empty list (reindex all).
537
        Otherwise, return all the indexes that have been specified for the
538
        object within the action pool job.
539
        """
540
        idxs = []
541
        info = self.objects.get(uid, {})
542
        for action_id, value in info.items():
543
            obj_idxs = value.get('idxs', None)
544
            if obj_idxs is None:
545
                # Don't reindex!
546
                continue
547
            elif len(obj_idxs) == 0:
548
                # Reindex all indexes!
549
                return []
550
            idxs.extend(obj_idxs)
551
        # Always reindex review_state and is_active
552
        idxs.extend(["review_state", "is_active"])
553
        return list(set(idxs))
554
555
556
def push_reindex_to_actions_pool(obj, idxs=None):
557
    """Push a reindex job to the actions handler pool
558
    """
559
    indexes = idxs and idxs or []
560
    pool = ActionHandlerPool.get_instance()
561
    pool.push(obj, "reindex", success=True, idxs=indexes)
562