Passed
Push — master ( e83033...10ce0d )
by Ramon
05:04 queued 01:01
created

bika.lims.workflow.getAllowedTransitions()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2019 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import collections
22
import sys
23
import threading
24
25
from AccessControl.SecurityInfo import ModuleSecurityInfo
26
from bika.lims import PMF
27
from bika.lims import api
28
from bika.lims import logger
29
from bika.lims.browser import ulocalized_time
30
from bika.lims.decorators import synchronized
31
from bika.lims.interfaces import IActionHandlerPool, IGuardAdapter
32
from bika.lims.interfaces import IJSONReadExtender
33
from bika.lims.jsonapi import get_include_fields
34
from bika.lims.utils import changeWorkflowState  # noqa
35
from bika.lims.utils import t
36
from bika.lims.workflow.indexes import ACTIONS_TO_INDEXES
37
from Products.Archetypes.config import UID_CATALOG
38
from Products.CMFCore.utils import getToolByName
39
from Products.CMFCore.WorkflowCore import WorkflowException
40
from zope.component import getAdapters
41
from zope.interface import implements
42
from ZPublisher.HTTPRequest import HTTPRequest
43
44
security = ModuleSecurityInfo('bika.lims.workflow')
45
security.declarePublic('guard_handler')
46
47
_marker = object()
48
49
50
def skip(instance, action, peek=False, unskip=False):
51
    """Returns True if the transition is to be SKIPPED
52
53
        peek - True just checks the value, does not set.
54
        unskip - remove skip key (for manual overrides).
55
56
    called with only (instance, action_id), this will set the request variable
57
    preventing the cascade's from re-transitioning the object and return None.
58
    """
59
60
    uid = callable(instance.UID) and instance.UID() or instance.UID
61
    skipkey = "%s_%s" % (uid, action)
62
    if 'workflow_skiplist' not in instance.REQUEST:
63
        if not peek and not unskip:
64
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
65
    else:
66
        if skipkey in instance.REQUEST['workflow_skiplist']:
67
            if unskip:
68
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
69
            else:
70
                return True
71
        else:
72
            if not peek and not unskip:
73
                instance.REQUEST["workflow_skiplist"].append(skipkey)
74
75
76
def doActionFor(instance, action_id, idxs=None):
77
    """Tries to perform the transition to the instance.
78
    Object is reindexed after the transition takes place, but only if succeeds.
79
    If idxs is set, only these indexes will be reindexed. Otherwise, will try
80
    to use the indexes defined in ACTIONS_TO_INDEX mapping if any.
81
    :param instance: Object to be transitioned
82
    :param action_id: transition id
83
    :param idxs: indexes to be reindexed after the transition
84
    :returns: True if the transition has been performed, together with message
85
    :rtype: tuple (bool,str)
86
    """
87
    if not instance:
88
        return False, ""
89
90
    if isinstance(instance, list):
91
        # TODO Workflow . Check if this is strictly necessary
92
        # This check is here because sometimes Plone creates a list
93
        # from submitted form elements.
94
        logger.warn("Got a list of obj in doActionFor!")
95
        if len(instance) > 1:
96
            logger.warn(
97
                "doActionFor is getting an instance parameter which is a list "
98
                "with more than one item. Instance: '{}', action_id: '{}'"
99
                .format(instance, action_id)
100
            )
101
102
        return doActionFor(
103
            instance=instance[0], action_id=action_id, idxs=idxs)
104
105
    # Since a given transition can cascade or promote to other objects, we want
106
    # to reindex all objects for which the transition succeed at once, at the
107
    # end of process. Otherwise, same object will be reindexed multiple times
108
    # unnecessarily.
109
    # Also, ActionsHandlerPool ensures the same transition is not applied twice
110
    # to the same object due to cascade/promote recursions.
111
    pool = ActionHandlerPool.get_instance()
112
    if pool.succeed(instance, action_id):
113
        return False, "Transition {} for {} already done"\
114
             .format(action_id, instance.getId())
115
116
    # Return False if transition is not permitted
117
    if not isTransitionAllowed(instance, action_id):
118
        return False, "Transition {} for {} is not allowed"\
119
            .format(action_id, instance.getId())
120
121
    # Add this batch process to the queue
122
    pool.queue_pool()
123
    succeed = False
124
    message = ""
125
    workflow = getToolByName(instance, "portal_workflow")
126
    try:
127
        workflow.doActionFor(instance, action_id)
128
        succeed = True
129
    except WorkflowException as e:
130
        message = str(e)
131
        curr_state = getCurrentState(instance)
132
        clazz_name = instance.__class__.__name__
133
        logger.warning(
134
            "Transition '{0}' not allowed: {1} '{2}' ({3})"
135
            .format(action_id, clazz_name, instance.getId(), curr_state))
136
        logger.error(message)
137
138
    # If no indexes to reindex have been defined, try to use those defined in
139
    # the ACTIONS_TO_INDEXES mapping. Reindexing only those indexes that might
140
    # be affected by the transition boosts the overall performance!.
141
    if idxs is None:
142
        portal_type = instance.portal_type
143
        idxs = ACTIONS_TO_INDEXES.get(portal_type, {}).get(action_id, [])
144
145
    # Add the current object to the pool and resume
146
    pool.push(instance, action_id, succeed, idxs=idxs)
147
    pool.resume()
148
149
    return succeed, message
150
151
152
def call_workflow_event(instance, event, after=True):
153
    """Calls the instance's workflow event
154
    """
155
    if not event.transition:
156
        return False
157
158
    portal_type = instance.portal_type
159
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
160
    if not wf_module:
161
        return False
162
163
    # Inspect if event_<transition_id> function exists in the module
164
    prefix = after and "after" or "before"
165
    func_name = "{}_{}".format(prefix, event.transition.id)
166
    func = getattr(wf_module, func_name, False)
167
    if not func:
168
        return False
169
170
    logger.info('WF event: {0}.events.{1}'
171
                .format(portal_type.lower(), func_name))
172
    func(instance)
173
    return True
174
175
176
def BeforeTransitionEventHandler(instance, event):
177
    """ This event is executed before each transition and delegates further
178
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
179
    if exists for the instance passed in.
180
    :param instance: the instance to be transitioned
181
    :type instance: ATContentType
182
    :param event: event that holds the transition to be performed
183
    :type event: IObjectEvent
184
    """
185
    call_workflow_event(instance, event, after=False)
186
187
188
def AfterTransitionEventHandler(instance, event):
189
    """ This event is executed after each transition and delegates further
190
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
191
    if exists for the instance passed in.
192
    :param instance: the instance that has been transitioned
193
    :type instance: ATContentType
194
    :param event: event that holds the transition performed
195
    :type event: IObjectEvent
196
    """
197
    if call_workflow_event(instance, event, after=True):
198
        return
199
200
    # Try with old AfterTransitionHandler dance...
201
    # TODO REMOVE AFTER PORTING workflow_script_*/*_transition_event
202
    if not event.transition:
203
        return
204
    # Set the request variable preventing cascade's from re-transitioning.
205
    if skip(instance, event.transition.id):
206
        return
207
    # Because at this point, the object has been transitioned already, but
208
    # further actions are probably needed still, so be sure is reindexed
209
    # before going forward.
210
    instance.reindexObject()
211
    key = 'after_{0}_transition_event'.format(event.transition.id)
212
    after_event = getattr(instance, key, False)
213
    if not after_event:
214
        # TODO Workflow. this conditional is only for backwards compatibility,
215
        # to be removed when all workflow_script_* methods in contents are
216
        # replaced by the more explicity signature 'after_*_transition_event'
217
        key = 'workflow_script_' + event.transition.id
218
        after_event = getattr(instance, key, False)
219
    if not after_event:
220
        return
221
    after_event()
222
223
224
def get_workflow_actions(obj):
225
    """ Compile a list of possible workflow transitions for this object
226
    """
227
228
    def translate(id):
229
        return t(PMF(id + "_transition_title"))
230
231
    transids = getAllowedTransitions(obj)
232
    actions = [{'id': it, 'title': translate(it)} for it in transids]
233
    return actions
234
235
236
def isTransitionAllowed(instance, transition_id):
237
    """Checks if the object can perform the transition passed in.
238
    :returns: True if transition can be performed
239
    :rtype: bool
240
    """
241
    wf_tool = getToolByName(instance, "portal_workflow")
242
    for wf_id in wf_tool.getChainFor(instance):
243
        wf = wf_tool.getWorkflowById(wf_id)
244
        if wf and wf.isActionSupported(instance, transition_id):
245
            return True
246
247
    return False
248
249
250
def getAllowedTransitions(instance):
251
    """Returns a list with the transition ids that can be performed against
252
    the instance passed in.
253
    :param instance: A content object
254
    :type instance: ATContentType
255
    :returns: A list of transition/action ids
256
    :rtype: list
257
    """
258
    wftool = getToolByName(instance, "portal_workflow")
259
    transitions = wftool.getTransitionsFor(instance)
260
    return [trans['id'] for trans in transitions]
261
262
263
def get_review_history_statuses(instance, reverse=False):
264
    """Returns a list with the statuses of the instance from the review_history
265
    """
266
    review_history = getReviewHistory(instance, reverse=reverse)
267
    return map(lambda event: event["review_state"], review_history)
268
269
270
def get_prev_status_from_history(instance, status=None):
271
    """Returns the previous status of the object. If status is set, returns the
272
    previous status before the object reached the status passed in.
273
    If instance has reached the status passed in more than once, only the last
274
    one is considered.
275
    """
276
    target = status or api.get_workflow_status_of(instance)
277
    history = getReviewHistory(instance, reverse=True)
278
    history = map(lambda event: event["review_state"], history)
279
    if target not in history or history.index(target) == len(history)-1:
280
        return None
281
    return history[history.index(target)+1]
282
283
284
def getReviewHistory(instance, reverse=True):
285
    """Returns the review history for the instance
286
    :returns: the list of historic events as dicts
287
    """
288
    return api.get_review_history(instance, rev=reverse)
289
290
291
def getCurrentState(obj, stateflowid='review_state'):
292
    """ The current state of the object for the state flow id specified
293
        Return empty if there's no workflow state for the object and flow id
294
    """
295
    return api.get_workflow_status_of(obj, stateflowid)
296
297
298
def in_state(obj, states, stateflowid='review_state'):
299
    """ Returns if the object passed matches with the states passed in
300
    """
301
    if not states:
302
        return False
303
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
304
    return obj_state in states
305
306
307
def getTransitionActor(obj, action_id):
308
    """Returns the actor that performed a given transition. If transition has
309
    not been perormed, or current user has no privileges, returns None
310
    :return: the username of the user that performed the transition passed-in
311
    :type: string
312
    """
313
    review_history = getReviewHistory(obj)
314
    for event in review_history:
315
        if event.get('action') == action_id:
316
            return event.get('actor')
317
    return None
318
319
320
def getTransitionDate(obj, action_id, return_as_datetime=False):
321
    """
322
    Returns date of action for object. Sometimes we need this date in Datetime
323
    format and that's why added return_as_datetime param.
324
    """
325
    review_history = getReviewHistory(obj)
326
    for event in review_history:
327
        if event.get('action') == action_id:
328
            evtime = event.get('time')
329
            if return_as_datetime:
330
                return evtime
331
            if evtime:
332
                value = ulocalized_time(evtime, long_format=True,
333
                                        time_only=False, context=obj)
334
                return value
335
    return None
336
337
338
def getTransitionUsers(obj, action_id, last_user=False):
339
    """
340
    This function returns a list with the users who have done the transition.
341
    :action_id: a sring as the transition id.
342
    :last_user: a boolean to return only the last user triggering the
343
        transition or all of them.
344
    :returns: a list of user ids.
345
    """
346
    workflow = getToolByName(obj, 'portal_workflow')
347
    users = []
348
    try:
349
        # https://jira.bikalabs.com/browse/LIMS-2242:
350
        # Sometimes the workflow history is inexplicably missing!
351
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
352
    except WorkflowException:
353
        logger.error(
354
            "workflow history is inexplicably missing."
355
            " https://jira.bikalabs.com/browse/LIMS-2242")
356
        return users
357
    # invert the list, so we always see the most recent matching event
358
    review_history.reverse()
359
    for event in review_history:
360
        if event.get('action', '') == action_id:
361
            value = event.get('actor', '')
362
            users.append(value)
363
            if last_user:
364
                return users
365
    return users
366
367
368
def guard_handler(instance, transition_id):
369
    """Generic workflow guard handler that returns true if the transition_id
370
    passed in can be performed to the instance passed in.
371
372
    This function is called automatically by a Script (Python) located at
373
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
374
    expression like "python:here.guard_handler('<transition_id>')" is set to
375
    any given guard (used by default in all bika's DC Workflow guards).
376
377
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
378
    that matches with 'guard_<transition_id>'. If found, calls the function and
379
    returns its value (true or false). If not found, returns True by default.
380
381
    :param instance: the object for which the transition_id has to be evaluated
382
    :param transition_id: the id of the transition
383
    :type instance: ATContentType
384
    :type transition_id: string
385
    :return: true if the transition can be performed to the passed in instance
386
    :rtype: bool
387
    """
388
    if not instance:
389
        return True
390
391
    # If adapters are found, core's guard will only be evaluated if, and only
392
    # if, ALL "pre-guards" return True
393
    for name, ad in getAdapters((instance,), IGuardAdapter):
394
        if ad.guard(transition_id) is False:
395
            return False
396
397
    clazz_name = instance.portal_type
398
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
399
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
400
    if not wf_module:
401
        return True
402
403
    # Inspect if guard_<transition_id> function exists in the above module
404
    key = 'guard_{0}'.format(transition_id)
405
    guard = getattr(wf_module, key, False)
406
    if not guard:
407
        return True
408
409
    return guard(instance)
410
411
412
def _load_wf_module(module_relative_name):
413
    """Loads a python module based on the module relative name passed in.
414
415
    At first, tries to get the module from sys.modules. If not found there, the
416
    function tries to load it by using importlib. Returns None if no module
417
    found or importlib is unable to load it because of errors.
418
    Eg:
419
        _load_wf_module('sample.events')
420
421
    will try to load the module 'bika.lims.workflow.sample.events'
422
423
    :param modrelname: relative name of the module to be loaded
424
    :type modrelname: string
425
    :return: the module
426
    :rtype: module
427
    """
428
    if not module_relative_name:
429
        return None
430
    if not isinstance(module_relative_name, basestring):
431
        return None
432
433
    rootmodname = __name__
434
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
435
    if modulekey in sys.modules:
436
        return sys.modules.get(modulekey, None)
437
438
    # Try to load the module recursively
439
    modname = None
440
    tokens = module_relative_name.split('.')
441
    for part in tokens:
442
        modname = '.'.join([modname, part]) if modname else part
443
        import importlib
444
        try:
445
            _module = importlib.import_module('.'+modname, package=rootmodname)
446
            if not _module:
447
                return None
448
        except Exception:
449
            return None
450
    return sys.modules.get(modulekey, None)
451
452
453
class JSONReadExtender(object):
454
455
    """- Adds the list of possible transitions to each object, if 'transitions'
456
    is specified in the include_fields.
457
    """
458
459
    implements(IJSONReadExtender)
460
461
    def __init__(self, context):
462
        self.context = context
463
464
    def __call__(self, request, data):
465
        include_fields = get_include_fields(request)
466
        if not include_fields or "transitions" in include_fields:
467
            data['transitions'] = get_workflow_actions(self.context)
468
469
470
class ActionHandlerPool(object):
471
    """Singleton to handle concurrent transitions
472
    """
473
    implements(IActionHandlerPool)
474
475
    __instance = None
476
    __lock = threading.Lock()
477
478
    @staticmethod
479
    def get_instance():
480
        """Returns the current instance of ActionHandlerPool
481
482
        TODO: Refactor to global utility
483
        """
484
        if ActionHandlerPool.__instance is None:
485
            # Thread-safe
486
            with ActionHandlerPool.__lock:
487
                if ActionHandlerPool.__instance is None:
488
                    ActionHandlerPool()
489
        return ActionHandlerPool.__instance
490
491
    def __init__(self):
492
        if ActionHandlerPool.__instance is not None:
493
            raise Exception("Use ActionHandlerPool.get_instance()")
494
        ActionHandlerPool.__instance = self
495
496
    def __len__(self):
497
        """Number of objects in the pool
498
        """
499
        return len(self.objects)
500
501
    def __repr__(self):
502
        """Better repr
503
        """
504
        return "<ActionHandlerPool for UIDs:[{}]>".format(
505
            ",".join(map(api.get_uid, self.objects)))
506
507
    def flush(self):
508
        self.objects = collections.OrderedDict()
509
        self.num_calls = 0
510
511
    @property
512
    def request(self):
513
        request = api.get_request()
514
        if not isinstance(request, HTTPRequest):
515
            return None
516
        return request
517
518
    @property
519
    def request_ahp(self):
520
        data = {
521
            "objects": collections.OrderedDict(),
522
            "num_calls": 0
523
        }
524
525
        request = self.request
526
        if request is None:
527
            # Maybe this is called by a non-request script
528
            return data
529
530
        if "__action_handler_pool" not in request:
531
            request["__action_handler_pool"] = data
532
        return request["__action_handler_pool"]
533
534
    @property
535
    def objects(self):
536
        return self.request_ahp["objects"]
537
538
    @objects.setter
539
    def objects(self, value):
540
        self.request_ahp["objects"] = value
541
542
    @property
543
    def num_calls(self):
544
        return self.request_ahp["num_calls"]
545
546
    @num_calls.setter
547
    def num_calls(self, value):
548
        self.request_ahp["num_calls"] = value
549
550
    @synchronized(max_connections=1)
551
    def queue_pool(self):
552
        """Notifies that a new batch of jobs is about to begin
553
        """
554
        self.num_calls += 1
555
556
    @synchronized(max_connections=1)
557
    def push(self, instance, action, success, idxs=_marker):
558
        """Adds an instance into the pool, to be reindexed on resume
559
        """
560
        if self.request is None:
561
            # This is called by a non-request script
562
            instance.reindexObject()
563
            return
564
565
        uid = api.get_uid(instance)
566
        info = self.objects.get(uid, {})
567
        idx = [] if idxs is _marker else idxs
568
        info[action] = {"success": success, "idxs": idx}
569
        self.objects[uid] = info
570
571
    def succeed(self, instance, action):
572
        """Returns if the task for the instance took place successfully
573
        """
574
        uid = api.get_uid(instance)
575
        return self.objects.get(uid, {}).get(action, {}).get("success", False)
576
577
    @synchronized(max_connections=1)
578
    def resume(self):
579
        """Resumes the pool and reindex all objects processed
580
        """
581
        # do not decrease the counter below 0
582
        if self.num_calls > 0:
583
            self.num_calls -= 1
584
585
        # postpone for pending calls
586
        if self.num_calls > 0:
587
            return
588
589
        # return immediately if there are no objects in the queue
590
        count = len(self)
591
        if count == 0:
592
            return
593
594
        logger.info("Resume actions for {} objects".format(count))
595
596
        # Fetch the objects from the pool
597
        processed = list()
598
        for brain in api.search(dict(UID=self.objects.keys()), UID_CATALOG):
599
            uid = api.get_uid(brain)
600
            if uid in processed:
601
                # This object has been processed already, do nothing
602
                continue
603
604
            # Reindex the object
605
            obj = api.get_object(brain)
606
            idxs = self.get_indexes(uid)
607
            idxs_str = idxs and ", ".join(idxs) or "-- All indexes --"
608
            logger.info("Reindexing {}: {}".format(obj.getId(), idxs_str))
609
            obj.reindexObject(idxs=idxs)
610
            processed.append(uid)
611
612
        # Cleanup the pool
613
        logger.info("Objects processed: {}".format(len(processed)))
614
        self.flush()
615
616
    def get_indexes(self, uid):
617
        """Returns the names of the indexes to be reindexed for the object with
618
        the uid passed in. If no indexes for this object have been specified
619
        within the action pool job, returns an empty list (reindex all).
620
        Otherwise, return all the indexes that have been specified for the
621
        object within the action pool job.
622
        """
623
        idxs = []
624
        info = self.objects.get(uid, {})
625
        for action_id, value in info.items():
626
            obj_idxs = value.get('idxs', None)
627
            if obj_idxs is None:
628
                # Don't reindex!
629
                continue
630
            elif len(obj_idxs) == 0:
631
                # Reindex all indexes!
632
                return []
633
            idxs.extend(obj_idxs)
634
        # Always reindex review_state and is_active
635
        idxs.extend(["review_state", "is_active"])
636
        return list(set(idxs))
637
638
639
def push_reindex_to_actions_pool(obj, idxs=None):
640
    """Push a reindex job to the actions handler pool
641
    """
642
    indexes = idxs and idxs or []
643
    pool = ActionHandlerPool.get_instance()
644
    pool.push(obj, "reindex", success=True, idxs=indexes)
645