Passed
Push — 2.x ( 482653...bdcbee )
by Ramon
04:59
created

bika.lims.workflow.ActionHandlerPool.resume()   A

Complexity

Conditions 5

Size

Total Lines 30
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 30
rs 9.0833
c 0
b 0
f 0
cc 5
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import collections
22
import six
23
import sys
24
import threading
25
26
from AccessControl.SecurityInfo import ModuleSecurityInfo
27
from bika.lims import PMF
28
from bika.lims import api
29
from bika.lims import logger
30
from bika.lims.browser import ulocalized_time
31
from bika.lims.decorators import synchronized
32
from bika.lims.interfaces import IActionHandlerPool, IGuardAdapter
33
from bika.lims.interfaces import IJSONReadExtender
34
from bika.lims.jsonapi import get_include_fields
35
from bika.lims.utils import changeWorkflowState  # noqa
36
from bika.lims.utils import t
37
from Products.Archetypes.config import UID_CATALOG
38
from Products.CMFCore.utils import getToolByName
39
from Products.CMFCore.WorkflowCore import WorkflowException
40
from zope.component import getAdapters
41
from zope.interface import implements
42
from ZPublisher.HTTPRequest import HTTPRequest
43
44
security = ModuleSecurityInfo('bika.lims.workflow')
45
security.declarePublic('guard_handler')
46
47
_marker = object()
48
49
50
def skip(instance, action, peek=False, unskip=False):
51
    """Returns True if the transition is to be SKIPPED
52
53
        peek - True just checks the value, does not set.
54
        unskip - remove skip key (for manual overrides).
55
56
    called with only (instance, action_id), this will set the request variable
57
    preventing the cascade's from re-transitioning the object and return None.
58
    """
59
60
    uid = callable(instance.UID) and instance.UID() or instance.UID
61
    skipkey = "%s_%s" % (uid, action)
62
    if 'workflow_skiplist' not in instance.REQUEST:
63
        if not peek and not unskip:
64
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
65
    else:
66
        if skipkey in instance.REQUEST['workflow_skiplist']:
67
            if unskip:
68
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
69
            else:
70
                return True
71
        else:
72
            if not peek and not unskip:
73
                instance.REQUEST["workflow_skiplist"].append(skipkey)
74
75
76
def doActionFor(instance, action_id):
77
    """Tries to perform the transition to the instance.
78
    Object is reindexed after the transition takes place, but only if succeeds.
79
    :param instance: Object to be transitioned
80
    :param action_id: transition id
81
    :returns: True if the transition has been performed, together with message
82
    :rtype: tuple (bool,str)
83
    """
84
    if not instance:
85
        return False, ""
86
87
    if isinstance(instance, list):
88
        # TODO Workflow . Check if this is strictly necessary
89
        # This check is here because sometimes Plone creates a list
90
        # from submitted form elements.
91
        logger.warn("Got a list of obj in doActionFor!")
92
        if len(instance) > 1:
93
            logger.warn(
94
                "doActionFor is getting an instance parameter which is a list "
95
                "with more than one item. Instance: '{}', action_id: '{}'"
96
                .format(instance, action_id)
97
            )
98
99
        return doActionFor(instance=instance[0], action_id=action_id)
100
101
    # Since a given transition can cascade or promote to other objects, we want
102
    # to reindex all objects for which the transition succeed at once, at the
103
    # end of process. Otherwise, same object will be reindexed multiple times
104
    # unnecessarily.
105
    # Also, ActionsHandlerPool ensures the same transition is not applied twice
106
    # to the same object due to cascade/promote recursions.
107
    pool = ActionHandlerPool.get_instance()
108
    if pool.succeed(instance, action_id):
109
        return False, "Transition {} for {} already done"\
110
             .format(action_id, instance.getId())
111
112
    # Return False if transition is not permitted
113
    if not isTransitionAllowed(instance, action_id):
114
        return False, "Transition {} for {} is not allowed"\
115
            .format(action_id, instance.getId())
116
117
    # Add this batch process to the queue
118
    pool.queue_pool()
119
    succeed = False
120
    message = ""
121
    workflow = getToolByName(instance, "portal_workflow")
122
    try:
123
        workflow.doActionFor(instance, action_id)
124
        succeed = True
125
    except WorkflowException as e:
126
        message = str(e)
127
        curr_state = getCurrentState(instance)
128
        clazz_name = instance.__class__.__name__
129
        logger.warning(
130
            "Transition '{0}' not allowed: {1} '{2}' ({3})"
131
            .format(action_id, clazz_name, instance.getId(), curr_state))
132
        logger.error(message)
133
134
    # Add the current object to the pool and resume
135
    pool.push(instance, action_id, succeed)
136
    pool.resume()
137
138
    return succeed, message
139
140
141
def call_workflow_event(instance, event, after=True):
142
    """Calls the instance's workflow event
143
    """
144
    if not event.transition:
145
        return False
146
147
    portal_type = instance.portal_type
148
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
149
    if not wf_module:
150
        return False
151
152
    # Inspect if event_<transition_id> function exists in the module
153
    prefix = after and "after" or "before"
154
    func_name = "{}_{}".format(prefix, event.transition.id)
155
    func = getattr(wf_module, func_name, False)
156
    if not func:
157
        return False
158
159
    logger.info('WF event: {0}.events.{1}'
160
                .format(portal_type.lower(), func_name))
161
    func(instance)
162
    return True
163
164
165
def BeforeTransitionEventHandler(instance, event):
166
    """ This event is executed before each transition and delegates further
167
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
168
    if exists for the instance passed in.
169
    :param instance: the instance to be transitioned
170
    :type instance: ATContentType
171
    :param event: event that holds the transition to be performed
172
    :type event: IObjectEvent
173
    """
174
    call_workflow_event(instance, event, after=False)
175
176
177
def AfterTransitionEventHandler(instance, event):
178
    """ This event is executed after each transition and delegates further
179
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
180
    if exists for the instance passed in.
181
    :param instance: the instance that has been transitioned
182
    :type instance: ATContentType
183
    :param event: event that holds the transition performed
184
    :type event: IObjectEvent
185
    """
186
    if call_workflow_event(instance, event, after=True):
187
        return
188
189
    # Try with old AfterTransitionHandler dance...
190
    # TODO REMOVE AFTER PORTING workflow_script_*/*_transition_event
191
    if not event.transition:
192
        return
193
    # Set the request variable preventing cascade's from re-transitioning.
194
    if skip(instance, event.transition.id):
195
        return
196
    # Because at this point, the object has been transitioned already, but
197
    # further actions are probably needed still, so be sure is reindexed
198
    # before going forward.
199
    instance.reindexObject()
200
    key = 'after_{0}_transition_event'.format(event.transition.id)
201
    after_event = getattr(instance, key, False)
202
    if not after_event:
203
        # TODO Workflow. this conditional is only for backwards compatibility,
204
        # to be removed when all workflow_script_* methods in contents are
205
        # replaced by the more explicity signature 'after_*_transition_event'
206
        key = 'workflow_script_' + event.transition.id
207
        after_event = getattr(instance, key, False)
208
    if not after_event:
209
        return
210
    after_event()
211
212
213
def get_workflow_actions(obj):
214
    """ Compile a list of possible workflow transitions for this object
215
    """
216
217
    def translate(id):
218
        return t(PMF(id + "_transition_title"))
219
220
    transids = getAllowedTransitions(obj)
221
    actions = [{'id': it, 'title': translate(it)} for it in transids]
222
    return actions
223
224
225
def isTransitionAllowed(instance, transition_id):
226
    """Checks if the object can perform the transition passed in.
227
    :returns: True if transition can be performed
228
    :rtype: bool
229
    """
230
    wf_tool = getToolByName(instance, "portal_workflow")
231
    for wf_id in wf_tool.getChainFor(instance):
232
        wf = wf_tool.getWorkflowById(wf_id)
233
        if wf and wf.isActionSupported(instance, transition_id):
234
            return True
235
236
    return False
237
238
239
def getAllowedTransitions(instance):
240
    """Returns a list with the transition ids that can be performed against
241
    the instance passed in.
242
    :param instance: A content object
243
    :type instance: ATContentType
244
    :returns: A list of transition/action ids
245
    :rtype: list
246
    """
247
    wftool = getToolByName(instance, "portal_workflow")
248
    transitions = wftool.getTransitionsFor(instance)
249
    return [trans['id'] for trans in transitions]
250
251
252
def get_review_history_statuses(instance, reverse=False):
253
    """Returns a list with the statuses of the instance from the review_history
254
    """
255
    review_history = getReviewHistory(instance, reverse=reverse)
256
    return map(lambda event: event["review_state"], review_history)
257
258
259
def getReviewHistory(instance, reverse=True):
260
    """Returns the review history for the instance
261
    :returns: the list of historic events as dicts
262
    """
263
    return api.get_review_history(instance, rev=reverse)
264
265
266
def getCurrentState(obj, stateflowid='review_state'):
267
    """ The current state of the object for the state flow id specified
268
        Return empty if there's no workflow state for the object and flow id
269
    """
270
    return api.get_workflow_status_of(obj, stateflowid)
271
272
273
def in_state(obj, states, stateflowid='review_state'):
274
    """ Returns if the object passed matches with the states passed in
275
    """
276
    if not states:
277
        return False
278
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
279
    return obj_state in states
280
281
282
def getTransitionActor(obj, action_id):
283
    """Returns the actor that performed a given transition. If transition has
284
    not been perormed, or current user has no privileges, returns None
285
    :return: the username of the user that performed the transition passed-in
286
    :type: string
287
    """
288
    review_history = getReviewHistory(obj)
289
    for event in review_history:
290
        if event.get('action') == action_id:
291
            return event.get('actor')
292
    return None
293
294
295
def getTransitionDate(obj, action_id, return_as_datetime=False):
296
    """
297
    Returns date of action for object. Sometimes we need this date in Datetime
298
    format and that's why added return_as_datetime param.
299
    """
300
    review_history = getReviewHistory(obj)
301
    for event in review_history:
302
        if event.get('action') == action_id:
303
            evtime = event.get('time')
304
            if return_as_datetime:
305
                return evtime
306
            if evtime:
307
                value = ulocalized_time(evtime, long_format=True,
308
                                        time_only=False, context=obj)
309
                return value
310
    return None
311
312
313
def getTransitionUsers(obj, action_id, last_user=False):
314
    """
315
    This function returns a list with the users who have done the transition.
316
    :action_id: a sring as the transition id.
317
    :last_user: a boolean to return only the last user triggering the
318
        transition or all of them.
319
    :returns: a list of user ids.
320
    """
321
    workflow = getToolByName(obj, 'portal_workflow')
322
    users = []
323
    try:
324
        # https://jira.bikalabs.com/browse/LIMS-2242:
325
        # Sometimes the workflow history is inexplicably missing!
326
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
327
    except WorkflowException:
328
        logger.error(
329
            "workflow history is inexplicably missing."
330
            " https://jira.bikalabs.com/browse/LIMS-2242")
331
        return users
332
    # invert the list, so we always see the most recent matching event
333
    review_history.reverse()
334
    for event in review_history:
335
        if event.get('action', '') == action_id:
336
            value = event.get('actor', '')
337
            users.append(value)
338
            if last_user:
339
                return users
340
    return users
341
342
343
def guard_handler(instance, transition_id):
344
    """Generic workflow guard handler that returns true if the transition_id
345
    passed in can be performed to the instance passed in.
346
347
    This function is called automatically by a Script (Python) located at
348
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
349
    expression like "python:here.guard_handler('<transition_id>')" is set to
350
    any given guard (used by default in all bika's DC Workflow guards).
351
352
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
353
    that matches with 'guard_<transition_id>'. If found, calls the function and
354
    returns its value (true or false). If not found, returns True by default.
355
356
    :param instance: the object for which the transition_id has to be evaluated
357
    :param transition_id: the id of the transition
358
    :type instance: ATContentType
359
    :type transition_id: string
360
    :return: true if the transition can be performed to the passed in instance
361
    :rtype: bool
362
    """
363
    if not instance:
364
        return True
365
366
    # If adapters are found, core's guard will only be evaluated if, and only
367
    # if, ALL "pre-guards" return True
368
    for name, ad in getAdapters((instance,), IGuardAdapter):
369
        if ad.guard(transition_id) is False:
370
            return False
371
372
    clazz_name = instance.portal_type
373
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
374
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
375
    if not wf_module:
376
        return True
377
378
    # Inspect if guard_<transition_id> function exists in the above module
379
    key = 'guard_{0}'.format(transition_id)
380
    guard = getattr(wf_module, key, False)
381
    if not guard:
382
        return True
383
384
    return guard(instance)
385
386
387
def _load_wf_module(module_relative_name):
388
    """Loads a python module based on the module relative name passed in.
389
390
    At first, tries to get the module from sys.modules. If not found there, the
391
    function tries to load it by using importlib. Returns None if no module
392
    found or importlib is unable to load it because of errors.
393
    Eg:
394
        _load_wf_module('sample.events')
395
396
    will try to load the module 'bika.lims.workflow.sample.events'
397
398
    :param modrelname: relative name of the module to be loaded
399
    :type modrelname: string
400
    :return: the module
401
    :rtype: module
402
    """
403
    if not module_relative_name:
404
        return None
405
    if not isinstance(module_relative_name, six.string_types):
406
        return None
407
408
    rootmodname = __name__
409
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
410
    if modulekey in sys.modules:
411
        return sys.modules.get(modulekey, None)
412
413
    # Try to load the module recursively
414
    modname = None
415
    tokens = module_relative_name.split('.')
416
    for part in tokens:
417
        modname = '.'.join([modname, part]) if modname else part
418
        import importlib
419
        try:
420
            _module = importlib.import_module('.'+modname, package=rootmodname)
421
            if not _module:
422
                return None
423
        except Exception:
424
            return None
425
    return sys.modules.get(modulekey, None)
426
427
428
class JSONReadExtender(object):
429
430
    """- Adds the list of possible transitions to each object, if 'transitions'
431
    is specified in the include_fields.
432
    """
433
434
    implements(IJSONReadExtender)
435
436
    def __init__(self, context):
437
        self.context = context
438
439
    def __call__(self, request, data):
440
        include_fields = get_include_fields(request)
441
        if not include_fields or "transitions" in include_fields:
442
            data['transitions'] = get_workflow_actions(self.context)
443
444
445
class ActionHandlerPool(object):
446
    """Singleton to handle concurrent transitions
447
    """
448
    implements(IActionHandlerPool)
449
450
    __instance = None
451
    __lock = threading.Lock()
452
453
    @staticmethod
454
    def get_instance():
455
        """Returns the current instance of ActionHandlerPool
456
457
        TODO: Refactor to global utility
458
        """
459
        if ActionHandlerPool.__instance is None:
460
            # Thread-safe
461
            with ActionHandlerPool.__lock:
462
                if ActionHandlerPool.__instance is None:
463
                    ActionHandlerPool()
464
        return ActionHandlerPool.__instance
465
466
    def __init__(self):
467
        if ActionHandlerPool.__instance is not None:
468
            raise Exception("Use ActionHandlerPool.get_instance()")
469
        ActionHandlerPool.__instance = self
470
471
    def __len__(self):
472
        """Number of objects in the pool
473
        """
474
        return len(self.objects)
475
476
    def __repr__(self):
477
        """Better repr
478
        """
479
        return "<ActionHandlerPool for UIDs:[{}]>".format(
480
            ",".join(map(api.get_uid, self.objects)))
481
482
    def flush(self):
483
        self.objects = collections.OrderedDict()
484
        self.num_calls = 0
485
486
    @property
487
    def request(self):
488
        request = api.get_request()
489
        if not isinstance(request, HTTPRequest):
490
            return None
491
        return request
492
493
    @property
494
    def request_ahp(self):
495
        data = {
496
            "objects": collections.OrderedDict(),
497
            "num_calls": 0
498
        }
499
500
        request = self.request
501
        if request is None:
502
            # Maybe this is called by a non-request script
503
            return data
504
505
        if "__action_handler_pool" not in request:
506
            request["__action_handler_pool"] = data
507
        return request["__action_handler_pool"]
508
509
    @property
510
    def objects(self):
511
        return self.request_ahp["objects"]
512
513
    @objects.setter
514
    def objects(self, value):
515
        self.request_ahp["objects"] = value
516
517
    @property
518
    def num_calls(self):
519
        return self.request_ahp["num_calls"]
520
521
    @num_calls.setter
522
    def num_calls(self, value):
523
        self.request_ahp["num_calls"] = value
524
525
    @synchronized(max_connections=1)
526
    def queue_pool(self):
527
        """Notifies that a new batch of jobs is about to begin
528
        """
529
        self.num_calls += 1
530
531
    @synchronized(max_connections=1)
532
    def push(self, instance, action, success):
533
        """Adds an instance into the pool, to be reindexed on resume
534
        """
535
        if self.request is None:
536
            # This is called by a non-request script
537
            instance.reindexObject()
538
            return
539
540
        uid = api.get_uid(instance)
541
        info = self.objects.get(uid, {})
542
        info[action] = {"success": success}
543
        self.objects[uid] = info
544
545
    def succeed(self, instance, action):
546
        """Returns if the task for the instance took place successfully
547
        """
548
        uid = api.get_uid(instance)
549
        return self.objects.get(uid, {}).get(action, {}).get("success", False)
550
551
    @synchronized(max_connections=1)
552
    def resume(self):
553
        """Resumes the pool and reindex all objects processed
554
        """
555
        # do not decrease the counter below 0
556
        if self.num_calls > 0:
557
            self.num_calls -= 1
558
559
        # postpone for pending calls
560
        if self.num_calls > 0:
561
            return
562
563
        # return immediately if there are no objects in the queue
564
        count = len(self)
565
        if count == 0:
566
            return
567
568
        logger.info("Resume actions for {} objects".format(count))
569
570
        # Fetch the objects from the pool
571
        query = {"UID": self.objects.keys()}
572
        brains = api.search(query, UID_CATALOG)
573
        for brain in api.search(query, UID_CATALOG):
574
            # Reindex the object
575
            obj = api.get_object(brain)
576
            obj.reindexObject()
577
578
        # Cleanup the pool
579
        logger.info("Objects processed: {}".format(len(brains)))
580
        self.flush()
581
582
583
def push_reindex_to_actions_pool(obj):
584
    """Push a reindex job to the actions handler pool
585
    """
586
    pool = ActionHandlerPool.get_instance()
587
    pool.push(obj, "reindex", success=True)
588