Passed
Push — master ( 2035c6...7cdfd5 )
by Jordi
05:03
created

bika.lims.workflow.ActionHandlerPool.request_ahp()   A

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
cc 3
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import collections
22
import sys
23
import threading
24
25
from AccessControl.SecurityInfo import ModuleSecurityInfo
26
from bika.lims import PMF
27
from bika.lims import api
28
from bika.lims import logger
29
from bika.lims.browser import ulocalized_time
30
from bika.lims.decorators import synchronized
31
from bika.lims.interfaces import IActionHandlerPool
32
from bika.lims.interfaces import IJSONReadExtender
33
from bika.lims.jsonapi import get_include_fields
34
from bika.lims.utils import changeWorkflowState  # noqa
35
from bika.lims.utils import t
36
from bika.lims.workflow.indexes import ACTIONS_TO_INDEXES
37
from Products.Archetypes.config import UID_CATALOG
38
from Products.CMFCore.utils import getToolByName
39
from Products.CMFCore.WorkflowCore import WorkflowException
40
from zope.interface import implements
41
from ZPublisher.HTTPRequest import HTTPRequest
42
43
security = ModuleSecurityInfo('bika.lims.workflow')
44
security.declarePublic('guard_handler')
45
46
_marker = object()
47
48
49
def skip(instance, action, peek=False, unskip=False):
50
    """Returns True if the transition is to be SKIPPED
51
52
        peek - True just checks the value, does not set.
53
        unskip - remove skip key (for manual overrides).
54
55
    called with only (instance, action_id), this will set the request variable
56
    preventing the cascade's from re-transitioning the object and return None.
57
    """
58
59
    uid = callable(instance.UID) and instance.UID() or instance.UID
60
    skipkey = "%s_%s" % (uid, action)
61
    if 'workflow_skiplist' not in instance.REQUEST:
62
        if not peek and not unskip:
63
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
64
    else:
65
        if skipkey in instance.REQUEST['workflow_skiplist']:
66
            if unskip:
67
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
68
            else:
69
                return True
70
        else:
71
            if not peek and not unskip:
72
                instance.REQUEST["workflow_skiplist"].append(skipkey)
73
74
75
def doActionFor(instance, action_id, idxs=None):
76
    """Tries to perform the transition to the instance.
77
    Object is reindexed after the transition takes place, but only if succeeds.
78
    If idxs is set, only these indexes will be reindexed. Otherwise, will try
79
    to use the indexes defined in ACTIONS_TO_INDEX mapping if any.
80
    :param instance: Object to be transitioned
81
    :param action_id: transition id
82
    :param idxs: indexes to be reindexed after the transition
83
    :returns: True if the transition has been performed, together with message
84
    :rtype: tuple (bool,str)
85
    """
86
    if not instance:
87
        return False, ""
88
89
    if isinstance(instance, list):
90
        # TODO Workflow . Check if this is strictly necessary
91
        # This check is here because sometimes Plone creates a list
92
        # from submitted form elements.
93
        logger.warn("Got a list of obj in doActionFor!")
94
        if len(instance) > 1:
95
            logger.warn(
96
                "doActionFor is getting an instance parameter which is a list "
97
                "with more than one item. Instance: '{}', action_id: '{}'"
98
                .format(instance, action_id)
99
            )
100
101
        return doActionFor(
102
            instance=instance[0], action_id=action_id, idxs=idxs)
103
104
    # Since a given transition can cascade or promote to other objects, we want
105
    # to reindex all objects for which the transition succeed at once, at the
106
    # end of process. Otherwise, same object will be reindexed multiple times
107
    # unnecessarily.
108
    # Also, ActionsHandlerPool ensures the same transition is not applied twice
109
    # to the same object due to cascade/promote recursions.
110
    pool = ActionHandlerPool.get_instance()
111
    if pool.succeed(instance, action_id):
112
        return False, "Transition {} for {} already done"\
113
             .format(action_id, instance.getId())
114
115
    # Return False if transition is not permitted
116
    if not isTransitionAllowed(instance, action_id):
117
        return False, "Transition {} for {} is not allowed"\
118
            .format(action_id, instance.getId())
119
120
    # Add this batch process to the queue
121
    pool.queue_pool()
122
    succeed = False
123
    message = ""
124
    workflow = getToolByName(instance, "portal_workflow")
125
    try:
126
        workflow.doActionFor(instance, action_id)
127
        succeed = True
128
    except WorkflowException as e:
129
        message = str(e)
130
        curr_state = getCurrentState(instance)
131
        clazz_name = instance.__class__.__name__
132
        logger.warning(
133
            "Transition '{0}' not allowed: {1} '{2}' ({3})"
134
            .format(action_id, clazz_name, instance.getId(), curr_state))
135
        logger.error(message)
136
137
    # If no indexes to reindex have been defined, try to use those defined in
138
    # the ACTIONS_TO_INDEXES mapping. Reindexing only those indexes that might
139
    # be affected by the transition boosts the overall performance!.
140
    if idxs is None:
141
        portal_type = instance.portal_type
142
        idxs = ACTIONS_TO_INDEXES.get(portal_type, {}).get(action_id, [])
143
144
    # Add the current object to the pool and resume
145
    pool.push(instance, action_id, succeed, idxs=idxs)
146
    pool.resume()
147
148
    return succeed, message
149
150
151
def call_workflow_event(instance, event, after=True):
152
    """Calls the instance's workflow event
153
    """
154
    if not event.transition:
155
        return False
156
157
    portal_type = instance.portal_type
158
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
159
    if not wf_module:
160
        return False
161
162
    # Inspect if event_<transition_id> function exists in the module
163
    prefix = after and "after" or "before"
164
    func_name = "{}_{}".format(prefix, event.transition.id)
165
    func = getattr(wf_module, func_name, False)
166
    if not func:
167
        return False
168
169
    logger.info('WF event: {0}.events.{1}'
170
                .format(portal_type.lower(), func_name))
171
    func(instance)
172
    return True
173
174
175
def BeforeTransitionEventHandler(instance, event):
176
    """ This event is executed before each transition and delegates further
177
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
178
    if exists for the instance passed in.
179
    :param instance: the instance to be transitioned
180
    :type instance: ATContentType
181
    :param event: event that holds the transition to be performed
182
    :type event: IObjectEvent
183
    """
184
    call_workflow_event(instance, event, after=False)
185
186
187
def AfterTransitionEventHandler(instance, event):
188
    """ This event is executed after each transition and delegates further
189
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
190
    if exists for the instance passed in.
191
    :param instance: the instance that has been transitioned
192
    :type instance: ATContentType
193
    :param event: event that holds the transition performed
194
    :type event: IObjectEvent
195
    """
196
    if call_workflow_event(instance, event, after=True):
197
        return
198
199
    # Try with old AfterTransitionHandler dance...
200
    # TODO REMOVE AFTER PORTING workflow_script_*/*_transition_event
201
    if not event.transition:
202
        return
203
    # Set the request variable preventing cascade's from re-transitioning.
204
    if skip(instance, event.transition.id):
205
        return
206
    # Because at this point, the object has been transitioned already, but
207
    # further actions are probably needed still, so be sure is reindexed
208
    # before going forward.
209
    instance.reindexObject()
210
    key = 'after_{0}_transition_event'.format(event.transition.id)
211
    after_event = getattr(instance, key, False)
212
    if not after_event:
213
        # TODO Workflow. this conditional is only for backwards compatibility,
214
        # to be removed when all workflow_script_* methods in contents are
215
        # replaced by the more explicity signature 'after_*_transition_event'
216
        key = 'workflow_script_' + event.transition.id
217
        after_event = getattr(instance, key, False)
218
    if not after_event:
219
        return
220
    after_event()
221
222
223
def get_workflow_actions(obj):
224
    """ Compile a list of possible workflow transitions for this object
225
    """
226
227
    def translate(id):
228
        return t(PMF(id + "_transition_title"))
229
230
    transids = getAllowedTransitions(obj)
231
    actions = [{'id': it, 'title': translate(it)} for it in transids]
232
    return actions
233
234
235
def isTransitionAllowed(instance, transition_id):
236
    """Checks if the object can perform the transition passed in.
237
    :returns: True if transition can be performed
238
    :rtype: bool
239
    """
240
    wf_tool = getToolByName(instance, "portal_workflow")
241
    for wf_id in wf_tool.getChainFor(instance):
242
        wf = wf_tool.getWorkflowById(wf_id)
243
        if wf and wf.isActionSupported(instance, transition_id):
244
            return True
245
246
    return False
247
248
249
def getAllowedTransitions(instance):
250
    """Returns a list with the transition ids that can be performed against
251
    the instance passed in.
252
    :param instance: A content object
253
    :type instance: ATContentType
254
    :returns: A list of transition/action ids
255
    :rtype: list
256
    """
257
    wftool = getToolByName(instance, "portal_workflow")
258
    transitions = wftool.getTransitionsFor(instance)
259
    return [trans['id'] for trans in transitions]
260
261
262
def get_review_history_statuses(instance, reverse=False):
263
    """Returns a list with the statuses of the instance from the review_history
264
    """
265
    review_history = getReviewHistory(instance, reverse=reverse)
266
    return map(lambda event: event["review_state"], review_history)
267
268
269
def get_prev_status_from_history(instance, status=None):
270
    """Returns the previous status of the object. If status is set, returns the
271
    previous status before the object reached the status passed in.
272
    If instance has reached the status passed in more than once, only the last
273
    one is considered.
274
    """
275
    target = status or api.get_workflow_status_of(instance)
276
    history = getReviewHistory(instance, reverse=True)
277
    history = map(lambda event: event["review_state"], history)
278
    if target not in history or history.index(target) == len(history)-1:
279
        return None
280
    return history[history.index(target)+1]
281
282
283
def getReviewHistory(instance, reverse=True):
284
    """Returns the review history for the instance
285
    :returns: the list of historic events as dicts
286
    """
287
    return api.get_review_history(instance, rev=reverse)
288
289
290
def getCurrentState(obj, stateflowid='review_state'):
291
    """ The current state of the object for the state flow id specified
292
        Return empty if there's no workflow state for the object and flow id
293
    """
294
    return api.get_workflow_status_of(obj, stateflowid)
295
296
297
def in_state(obj, states, stateflowid='review_state'):
298
    """ Returns if the object passed matches with the states passed in
299
    """
300
    if not states:
301
        return False
302
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
303
    return obj_state in states
304
305
306
def getTransitionActor(obj, action_id):
307
    """Returns the actor that performed a given transition. If transition has
308
    not been perormed, or current user has no privileges, returns None
309
    :return: the username of the user that performed the transition passed-in
310
    :type: string
311
    """
312
    review_history = getReviewHistory(obj)
313
    for event in review_history:
314
        if event.get('action') == action_id:
315
            return event.get('actor')
316
    return None
317
318
319
def getTransitionDate(obj, action_id, return_as_datetime=False):
320
    """
321
    Returns date of action for object. Sometimes we need this date in Datetime
322
    format and that's why added return_as_datetime param.
323
    """
324
    review_history = getReviewHistory(obj)
325
    for event in review_history:
326
        if event.get('action') == action_id:
327
            evtime = event.get('time')
328
            if return_as_datetime:
329
                return evtime
330
            if evtime:
331
                value = ulocalized_time(evtime, long_format=True,
332
                                        time_only=False, context=obj)
333
                return value
334
    return None
335
336
337
def getTransitionUsers(obj, action_id, last_user=False):
338
    """
339
    This function returns a list with the users who have done the transition.
340
    :action_id: a sring as the transition id.
341
    :last_user: a boolean to return only the last user triggering the
342
        transition or all of them.
343
    :returns: a list of user ids.
344
    """
345
    workflow = getToolByName(obj, 'portal_workflow')
346
    users = []
347
    try:
348
        # https://jira.bikalabs.com/browse/LIMS-2242:
349
        # Sometimes the workflow history is inexplicably missing!
350
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
351
    except WorkflowException:
352
        logger.error(
353
            "workflow history is inexplicably missing."
354
            " https://jira.bikalabs.com/browse/LIMS-2242")
355
        return users
356
    # invert the list, so we always see the most recent matching event
357
    review_history.reverse()
358
    for event in review_history:
359
        if event.get('action', '') == action_id:
360
            value = event.get('actor', '')
361
            users.append(value)
362
            if last_user:
363
                return users
364
    return users
365
366
367
def guard_handler(instance, transition_id):
368
    """Generic workflow guard handler that returns true if the transition_id
369
    passed in can be performed to the instance passed in.
370
371
    This function is called automatically by a Script (Python) located at
372
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
373
    expression like "python:here.guard_handler('<transition_id>')" is set to
374
    any given guard (used by default in all bika's DC Workflow guards).
375
376
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
377
    that matches with 'guard_<transition_id>'. If found, calls the function and
378
    returns its value (true or false). If not found, returns True by default.
379
380
    :param instance: the object for which the transition_id has to be evaluated
381
    :param transition_id: the id of the transition
382
    :type instance: ATContentType
383
    :type transition_id: string
384
    :return: true if the transition can be performed to the passed in instance
385
    :rtype: bool
386
    """
387
    if not instance:
388
        return True
389
    clazz_name = instance.portal_type
390
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
391
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
392
    if not wf_module:
393
        return True
394
395
    # Inspect if guard_<transition_id> function exists in the above module
396
    key = 'guard_{0}'.format(transition_id)
397
    guard = getattr(wf_module, key, False)
398
    if not guard:
399
        return True
400
401
    return guard(instance)
402
403
404
def _load_wf_module(module_relative_name):
405
    """Loads a python module based on the module relative name passed in.
406
407
    At first, tries to get the module from sys.modules. If not found there, the
408
    function tries to load it by using importlib. Returns None if no module
409
    found or importlib is unable to load it because of errors.
410
    Eg:
411
        _load_wf_module('sample.events')
412
413
    will try to load the module 'bika.lims.workflow.sample.events'
414
415
    :param modrelname: relative name of the module to be loaded
416
    :type modrelname: string
417
    :return: the module
418
    :rtype: module
419
    """
420
    if not module_relative_name:
421
        return None
422
    if not isinstance(module_relative_name, basestring):
423
        return None
424
425
    rootmodname = __name__
426
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
427
    if modulekey in sys.modules:
428
        return sys.modules.get(modulekey, None)
429
430
    # Try to load the module recursively
431
    modname = None
432
    tokens = module_relative_name.split('.')
433
    for part in tokens:
434
        modname = '.'.join([modname, part]) if modname else part
435
        import importlib
436
        try:
437
            _module = importlib.import_module('.'+modname, package=rootmodname)
438
            if not _module:
439
                return None
440
        except Exception:
441
            return None
442
    return sys.modules.get(modulekey, None)
443
444
445
class JSONReadExtender(object):
446
447
    """- Adds the list of possible transitions to each object, if 'transitions'
448
    is specified in the include_fields.
449
    """
450
451
    implements(IJSONReadExtender)
452
453
    def __init__(self, context):
454
        self.context = context
455
456
    def __call__(self, request, data):
457
        include_fields = get_include_fields(request)
458
        if not include_fields or "transitions" in include_fields:
459
            data['transitions'] = get_workflow_actions(self.context)
460
461
462
class ActionHandlerPool(object):
463
    """Singleton to handle concurrent transitions
464
    """
465
    implements(IActionHandlerPool)
466
467
    __instance = None
468
    __lock = threading.Lock()
469
470
    @staticmethod
471
    def get_instance():
472
        """Returns the current instance of ActionHandlerPool
473
474
        TODO: Refactor to global utility
475
        """
476
        if ActionHandlerPool.__instance is None:
477
            # Thread-safe
478
            with ActionHandlerPool.__lock:
479
                if ActionHandlerPool.__instance is None:
480
                    ActionHandlerPool()
481
        return ActionHandlerPool.__instance
482
483
    def __init__(self):
484
        if ActionHandlerPool.__instance is not None:
485
            raise Exception("Use ActionHandlerPool.get_instance()")
486
        ActionHandlerPool.__instance = self
487
488
    def __len__(self):
489
        """Number of objects in the pool
490
        """
491
        return len(self.objects)
492
493
    def __repr__(self):
494
        """Better repr
495
        """
496
        return "<ActionHandlerPool for UIDs:[{}]>".format(
497
            ",".join(map(api.get_uid, self.objects)))
498
499
    def flush(self):
500
        self.objects = collections.OrderedDict()
501
        self.num_calls = 0
502
503
    @property
504
    def request(self):
505
        request = api.get_request()
506
        if not isinstance(request, HTTPRequest):
507
            return None
508
        return request
509
510
    @property
511
    def request_ahp(self):
512
        data = {
513
            "objects": collections.OrderedDict(),
514
            "num_calls": 0
515
        }
516
517
        request = self.request
518
        if request is None:
519
            # Maybe this is called by a non-request script
520
            return data
521
522
        if "__action_handler_pool" not in request:
523
            request["__action_handler_pool"] = data
524
        return request["__action_handler_pool"]
525
526
    @property
527
    def objects(self):
528
        return self.request_ahp["objects"]
529
530
    @objects.setter
531
    def objects(self, value):
532
        self.request_ahp["objects"] = value
533
534
    @property
535
    def num_calls(self):
536
        return self.request_ahp["num_calls"]
537
538
    @num_calls.setter
539
    def num_calls(self, value):
540
        self.request_ahp["num_calls"] = value
541
542
    @synchronized(max_connections=1)
543
    def queue_pool(self):
544
        """Notifies that a new batch of jobs is about to begin
545
        """
546
        self.num_calls += 1
547
548
    @synchronized(max_connections=1)
549
    def push(self, instance, action, success, idxs=_marker):
550
        """Adds an instance into the pool, to be reindexed on resume
551
        """
552
        if self.request is None:
553
            # This is called by a non-request script
554
            instance.reindexObject()
555
            return
556
557
        uid = api.get_uid(instance)
558
        info = self.objects.get(uid, {})
559
        idx = [] if idxs is _marker else idxs
560
        info[action] = {"success": success, "idxs": idx}
561
        self.objects[uid] = info
562
563
    def succeed(self, instance, action):
564
        """Returns if the task for the instance took place successfully
565
        """
566
        uid = api.get_uid(instance)
567
        return self.objects.get(uid, {}).get(action, {}).get("success", False)
568
569
    @synchronized(max_connections=1)
570
    def resume(self):
571
        """Resumes the pool and reindex all objects processed
572
        """
573
        # do not decrease the counter below 0
574
        if self.num_calls > 0:
575
            self.num_calls -= 1
576
577
        # postpone for pending calls
578
        if self.num_calls > 0:
579
            return
580
581
        # return immediately if there are no objects in the queue
582
        count = len(self)
583
        if count == 0:
584
            return
585
586
        logger.info("Resume actions for {} objects".format(count))
587
588
        # Fetch the objects from the pool
589
        processed = list()
590
        for brain in api.search(dict(UID=self.objects.keys()), UID_CATALOG):
591
            uid = api.get_uid(brain)
592
            if uid in processed:
593
                # This object has been processed already, do nothing
594
                continue
595
596
            # Reindex the object
597
            obj = api.get_object(brain)
598
            idxs = self.get_indexes(uid)
599
            idxs_str = idxs and ", ".join(idxs) or "-- All indexes --"
600
            logger.info("Reindexing {}: {}".format(obj.getId(), idxs_str))
601
            obj.reindexObject(idxs=idxs)
602
            processed.append(uid)
603
604
        # Cleanup the pool
605
        logger.info("Objects processed: {}".format(len(processed)))
606
        self.flush()
607
608
    def get_indexes(self, uid):
609
        """Returns the names of the indexes to be reindexed for the object with
610
        the uid passed in. If no indexes for this object have been specified
611
        within the action pool job, returns an empty list (reindex all).
612
        Otherwise, return all the indexes that have been specified for the
613
        object within the action pool job.
614
        """
615
        idxs = []
616
        info = self.objects.get(uid, {})
617
        for action_id, value in info.items():
618
            obj_idxs = value.get('idxs', None)
619
            if obj_idxs is None:
620
                # Don't reindex!
621
                continue
622
            elif len(obj_idxs) == 0:
623
                # Reindex all indexes!
624
                return []
625
            idxs.extend(obj_idxs)
626
        # Always reindex review_state and is_active
627
        idxs.extend(["review_state", "is_active"])
628
        return list(set(idxs))
629
630
631
def push_reindex_to_actions_pool(obj, idxs=None):
632
    """Push a reindex job to the actions handler pool
633
    """
634
    indexes = idxs and idxs or []
635
    pool = ActionHandlerPool.get_instance()
636
    pool.push(obj, "reindex", success=True, idxs=indexes)
637