Passed
Push — 2.x ( 29276a...55d0fb )
by Ramon
08:51 queued 03:31
created

bika.lims.workflow.in_state()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 2
nop 3
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2021 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
21
import collections
22
import six
23
import sys
24
import threading
25
26
from AccessControl.SecurityInfo import ModuleSecurityInfo
27
from bika.lims import PMF
28
from bika.lims import api
29
from bika.lims import logger
30
from bika.lims.browser import ulocalized_time
31
from bika.lims.decorators import synchronized
32
from bika.lims.interfaces import IActionHandlerPool, IGuardAdapter
33
from bika.lims.interfaces import IJSONReadExtender
34
from bika.lims.jsonapi import get_include_fields
35
from bika.lims.utils import changeWorkflowState
36
from bika.lims.utils import t
37
from Products.Archetypes.config import UID_CATALOG
38
from Products.CMFCore.utils import getToolByName
39
from Products.CMFCore.WorkflowCore import WorkflowException
40
from zope.component import getAdapters
41
from zope.interface import implements
42
from ZPublisher.HTTPRequest import HTTPRequest
43
44
security = ModuleSecurityInfo('bika.lims.workflow')
45
security.declarePublic('guard_handler')
46
47
_marker = object()
48
49
50
def skip(instance, action, peek=False, unskip=False):
51
    """Returns True if the transition is to be SKIPPED
52
53
        peek - True just checks the value, does not set.
54
        unskip - remove skip key (for manual overrides).
55
56
    called with only (instance, action_id), this will set the request variable
57
    preventing the cascade's from re-transitioning the object and return None.
58
    """
59
60
    uid = callable(instance.UID) and instance.UID() or instance.UID
61
    skipkey = "%s_%s" % (uid, action)
62
    if 'workflow_skiplist' not in instance.REQUEST:
63
        if not peek and not unskip:
64
            instance.REQUEST['workflow_skiplist'] = [skipkey, ]
65
    else:
66
        if skipkey in instance.REQUEST['workflow_skiplist']:
67
            if unskip:
68
                instance.REQUEST['workflow_skiplist'].remove(skipkey)
69
            else:
70
                return True
71
        else:
72
            if not peek and not unskip:
73
                instance.REQUEST["workflow_skiplist"].append(skipkey)
74
75
76
def doActionFor(instance, action_id):
77
    """Tries to perform the transition to the instance.
78
    Object is reindexed after the transition takes place, but only if succeeds.
79
    :param instance: Object to be transitioned
80
    :param action_id: transition id
81
    :returns: True if the transition has been performed, together with message
82
    :rtype: tuple (bool,str)
83
    """
84
    if not instance:
85
        return False, ""
86
87
    if isinstance(instance, list):
88
        # TODO Workflow . Check if this is strictly necessary
89
        # This check is here because sometimes Plone creates a list
90
        # from submitted form elements.
91
        logger.warn("Got a list of obj in doActionFor!")
92
        if len(instance) > 1:
93
            logger.warn(
94
                "doActionFor is getting an instance parameter which is a list "
95
                "with more than one item. Instance: '{}', action_id: '{}'"
96
                .format(instance, action_id)
97
            )
98
99
        return doActionFor(instance=instance[0], action_id=action_id)
100
101
    # Since a given transition can cascade or promote to other objects, we want
102
    # to reindex all objects for which the transition succeed at once, at the
103
    # end of process. Otherwise, same object will be reindexed multiple times
104
    # unnecessarily.
105
    # Also, ActionsHandlerPool ensures the same transition is not applied twice
106
    # to the same object due to cascade/promote recursions.
107
    pool = ActionHandlerPool.get_instance()
108
    if pool.succeed(instance, action_id):
109
        return False, "Transition {} for {} already done"\
110
             .format(action_id, instance.getId())
111
112
    # Return False if transition is not permitted
113
    if not isTransitionAllowed(instance, action_id):
114
        return False, "Transition {} for {} is not allowed"\
115
            .format(action_id, instance.getId())
116
117
    # Add this batch process to the queue
118
    pool.queue_pool()
119
    succeed = False
120
    message = ""
121
    workflow = getToolByName(instance, "portal_workflow")
122
    try:
123
        workflow.doActionFor(instance, action_id)
124
        succeed = True
125
    except WorkflowException as e:
126
        message = str(e)
127
        curr_state = getCurrentState(instance)
128
        clazz_name = instance.__class__.__name__
129
        logger.warning(
130
            "Transition '{0}' not allowed: {1} '{2}' ({3})"
131
            .format(action_id, clazz_name, instance.getId(), curr_state))
132
        logger.error(message)
133
134
    # Add the current object to the pool and resume
135
    pool.push(instance, action_id, succeed)
136
    pool.resume()
137
138
    return succeed, message
139
140
141
def call_workflow_event(instance, event, after=True):
142
    """Calls the instance's workflow event
143
    """
144
    if not event.transition:
145
        return False
146
147
    portal_type = instance.portal_type
148
    wf_module = _load_wf_module('{}.events'.format(portal_type.lower()))
149
    if not wf_module:
150
        return False
151
152
    # Inspect if event_<transition_id> function exists in the module
153
    prefix = after and "after" or "before"
154
    func_name = "{}_{}".format(prefix, event.transition.id)
155
    func = getattr(wf_module, func_name, False)
156
    if not func:
157
        return False
158
159
    logger.info('WF event: {0}.events.{1}'
160
                .format(portal_type.lower(), func_name))
161
    func(instance)
162
    return True
163
164
165
def BeforeTransitionEventHandler(instance, event):
166
    """ This event is executed before each transition and delegates further
167
    actions to 'workflow.<portal_type>.events.before_<transition_id> function
168
    if exists for the instance passed in.
169
    :param instance: the instance to be transitioned
170
    :type instance: ATContentType
171
    :param event: event that holds the transition to be performed
172
    :type event: IObjectEvent
173
    """
174
    call_workflow_event(instance, event, after=False)
175
176
177
def AfterTransitionEventHandler(instance, event):
178
    """ This event is executed after each transition and delegates further
179
    actions to 'workflow.<portal_type>.events.after_<transition_id> function
180
    if exists for the instance passed in.
181
    :param instance: the instance that has been transitioned
182
    :type instance: ATContentType
183
    :param event: event that holds the transition performed
184
    :type event: IObjectEvent
185
    """
186
    if call_workflow_event(instance, event, after=True):
187
        return
188
189
    # Try with old AfterTransitionHandler dance...
190
    # TODO REMOVE AFTER PORTING workflow_script_*/*_transition_event
191
    if not event.transition:
192
        return
193
    # Set the request variable preventing cascade's from re-transitioning.
194
    if skip(instance, event.transition.id):
195
        return
196
    # Because at this point, the object has been transitioned already, but
197
    # further actions are probably needed still, so be sure is reindexed
198
    # before going forward.
199
    instance.reindexObject()
200
    key = 'after_{0}_transition_event'.format(event.transition.id)
201
    after_event = getattr(instance, key, False)
202
    if not after_event:
203
        # TODO Workflow. this conditional is only for backwards compatibility,
204
        # to be removed when all workflow_script_* methods in contents are
205
        # replaced by the more explicity signature 'after_*_transition_event'
206
        key = 'workflow_script_' + event.transition.id
207
        after_event = getattr(instance, key, False)
208
    if not after_event:
209
        return
210
    after_event()
211
212
213
def get_workflow_actions(obj):
214
    """ Compile a list of possible workflow transitions for this object
215
    """
216
217
    def translate(id):
218
        return t(PMF(id + "_transition_title"))
219
220
    transids = getAllowedTransitions(obj)
221
    actions = [{'id': it, 'title': translate(it)} for it in transids]
222
    return actions
223
224
225
def isTransitionAllowed(instance, transition_id):
226
    """Checks if the object can perform the transition passed in.
227
    :returns: True if transition can be performed
228
    :rtype: bool
229
    """
230
    wf_tool = getToolByName(instance, "portal_workflow")
231
    for wf_id in wf_tool.getChainFor(instance):
232
        wf = wf_tool.getWorkflowById(wf_id)
233
        if wf and wf.isActionSupported(instance, transition_id):
234
            return True
235
236
    return False
237
238
239
def getAllowedTransitions(instance):
240
    """Returns a list with the transition ids that can be performed against
241
    the instance passed in.
242
    :param instance: A content object
243
    :type instance: ATContentType
244
    :returns: A list of transition/action ids
245
    :rtype: list
246
    """
247
    wftool = getToolByName(instance, "portal_workflow")
248
    transitions = wftool.getTransitionsFor(instance)
249
    return [trans['id'] for trans in transitions]
250
251
252
def getCurrentState(obj, stateflowid='review_state'):
253
    """ The current state of the object for the state flow id specified
254
        Return empty if there's no workflow state for the object and flow id
255
    """
256
    return api.get_workflow_status_of(obj, stateflowid)
257
258
259
def in_state(obj, states, stateflowid='review_state'):
260
    """ Returns if the object passed matches with the states passed in
261
    """
262
    if not states:
263
        return False
264
    obj_state = getCurrentState(obj, stateflowid=stateflowid)
265
    return obj_state in states
266
267
268
def getTransitionActor(obj, action_id):
269
    """Returns the actor that performed a given transition. If transition has
270
    not been perormed, or current user has no privileges, returns None
271
    :return: the username of the user that performed the transition passed-in
272
    :type: string
273
    """
274
    review_history = api.get_review_history(obj)
275
    for event in review_history:
276
        if event.get('action') == action_id:
277
            return event.get('actor')
278
    return None
279
280
281
def getTransitionDate(obj, action_id, return_as_datetime=False):
282
    """
283
    Returns date of action for object. Sometimes we need this date in Datetime
284
    format and that's why added return_as_datetime param.
285
    """
286
    review_history = api.get_review_history(obj)
287
    for event in review_history:
288
        if event.get('action') == action_id:
289
            evtime = event.get('time')
290
            if return_as_datetime:
291
                return evtime
292
            if evtime:
293
                value = ulocalized_time(evtime, long_format=True,
294
                                        time_only=False, context=obj)
295
                return value
296
    return None
297
298
299
def getTransitionUsers(obj, action_id, last_user=False):
300
    """
301
    This function returns a list with the users who have done the transition.
302
    :action_id: a sring as the transition id.
303
    :last_user: a boolean to return only the last user triggering the
304
        transition or all of them.
305
    :returns: a list of user ids.
306
    """
307
    workflow = getToolByName(obj, 'portal_workflow')
308
    users = []
309
    try:
310
        # https://jira.bikalabs.com/browse/LIMS-2242:
311
        # Sometimes the workflow history is inexplicably missing!
312
        review_history = list(workflow.getInfoFor(obj, 'review_history'))
313
    except WorkflowException:
314
        logger.error(
315
            "workflow history is inexplicably missing."
316
            " https://jira.bikalabs.com/browse/LIMS-2242")
317
        return users
318
    # invert the list, so we always see the most recent matching event
319
    review_history.reverse()
320
    for event in review_history:
321
        if event.get('action', '') == action_id:
322
            value = event.get('actor', '')
323
            users.append(value)
324
            if last_user:
325
                return users
326
    return users
327
328
329
def guard_handler(instance, transition_id):
330
    """Generic workflow guard handler that returns true if the transition_id
331
    passed in can be performed to the instance passed in.
332
333
    This function is called automatically by a Script (Python) located at
334
    bika/lims/skins/guard_handler.py, which in turn is fired by Zope when an
335
    expression like "python:here.guard_handler('<transition_id>')" is set to
336
    any given guard (used by default in all bika's DC Workflow guards).
337
338
    Walks through bika.lims.workflow.<obj_type>.guards and looks for a function
339
    that matches with 'guard_<transition_id>'. If found, calls the function and
340
    returns its value (true or false). If not found, returns True by default.
341
342
    :param instance: the object for which the transition_id has to be evaluated
343
    :param transition_id: the id of the transition
344
    :type instance: ATContentType
345
    :type transition_id: string
346
    :return: true if the transition can be performed to the passed in instance
347
    :rtype: bool
348
    """
349
    if not instance:
350
        return True
351
352
    # If adapters are found, core's guard will only be evaluated if, and only
353
    # if, ALL "pre-guards" return True
354
    for name, ad in getAdapters((instance,), IGuardAdapter):
355
        if ad.guard(transition_id) is False:
356
            return False
357
358
    clazz_name = instance.portal_type
359
    # Inspect if bika.lims.workflow.<clazzname>.<guards> module exists
360
    wf_module = _load_wf_module('{0}.guards'.format(clazz_name.lower()))
361
    if not wf_module:
362
        return True
363
364
    # Inspect if guard_<transition_id> function exists in the above module
365
    key = 'guard_{0}'.format(transition_id)
366
    guard = getattr(wf_module, key, False)
367
    if not guard:
368
        return True
369
370
    return guard(instance)
371
372
373
def _load_wf_module(module_relative_name):
374
    """Loads a python module based on the module relative name passed in.
375
376
    At first, tries to get the module from sys.modules. If not found there, the
377
    function tries to load it by using importlib. Returns None if no module
378
    found or importlib is unable to load it because of errors.
379
    Eg:
380
        _load_wf_module('sample.events')
381
382
    will try to load the module 'bika.lims.workflow.sample.events'
383
384
    :param modrelname: relative name of the module to be loaded
385
    :type modrelname: string
386
    :return: the module
387
    :rtype: module
388
    """
389
    if not module_relative_name:
390
        return None
391
    if not isinstance(module_relative_name, six.string_types):
392
        return None
393
394
    rootmodname = __name__
395
    modulekey = '{0}.{1}'.format(rootmodname, module_relative_name)
396
    if modulekey in sys.modules:
397
        return sys.modules.get(modulekey, None)
398
399
    # Try to load the module recursively
400
    modname = None
401
    tokens = module_relative_name.split('.')
402
    for part in tokens:
403
        modname = '.'.join([modname, part]) if modname else part
404
        import importlib
405
        try:
406
            _module = importlib.import_module('.'+modname, package=rootmodname)
407
            if not _module:
408
                return None
409
        except Exception:
410
            return None
411
    return sys.modules.get(modulekey, None)
412
413
414
class JSONReadExtender(object):
415
416
    """- Adds the list of possible transitions to each object, if 'transitions'
417
    is specified in the include_fields.
418
    """
419
420
    implements(IJSONReadExtender)
421
422
    def __init__(self, context):
423
        self.context = context
424
425
    def __call__(self, request, data):
426
        include_fields = get_include_fields(request)
427
        if not include_fields or "transitions" in include_fields:
428
            data['transitions'] = get_workflow_actions(self.context)
429
430
431
class ActionHandlerPool(object):
432
    """Singleton to handle concurrent transitions
433
    """
434
    implements(IActionHandlerPool)
435
436
    __instance = None
437
    __lock = threading.Lock()
438
439
    @staticmethod
440
    def get_instance():
441
        """Returns the current instance of ActionHandlerPool
442
443
        TODO: Refactor to global utility
444
        """
445
        if ActionHandlerPool.__instance is None:
446
            # Thread-safe
447
            with ActionHandlerPool.__lock:
448
                if ActionHandlerPool.__instance is None:
449
                    ActionHandlerPool()
450
        return ActionHandlerPool.__instance
451
452
    def __init__(self):
453
        if ActionHandlerPool.__instance is not None:
454
            raise Exception("Use ActionHandlerPool.get_instance()")
455
        ActionHandlerPool.__instance = self
456
457
    def __len__(self):
458
        """Number of objects in the pool
459
        """
460
        return len(self.objects)
461
462
    def __repr__(self):
463
        """Better repr
464
        """
465
        return "<ActionHandlerPool for UIDs:[{}]>".format(
466
            ",".join(map(api.get_uid, self.objects)))
467
468
    def flush(self):
469
        self.objects = collections.OrderedDict()
470
        self.num_calls = 0
471
472
    @property
473
    def request(self):
474
        request = api.get_request()
475
        if not isinstance(request, HTTPRequest):
476
            return None
477
        return request
478
479
    @property
480
    def request_ahp(self):
481
        data = {
482
            "objects": collections.OrderedDict(),
483
            "num_calls": 0
484
        }
485
486
        request = self.request
487
        if request is None:
488
            # Maybe this is called by a non-request script
489
            return data
490
491
        if "__action_handler_pool" not in request:
492
            request["__action_handler_pool"] = data
493
        return request["__action_handler_pool"]
494
495
    @property
496
    def objects(self):
497
        return self.request_ahp["objects"]
498
499
    @objects.setter
500
    def objects(self, value):
501
        self.request_ahp["objects"] = value
502
503
    @property
504
    def num_calls(self):
505
        return self.request_ahp["num_calls"]
506
507
    @num_calls.setter
508
    def num_calls(self, value):
509
        self.request_ahp["num_calls"] = value
510
511
    @synchronized(max_connections=1)
512
    def queue_pool(self):
513
        """Notifies that a new batch of jobs is about to begin
514
        """
515
        self.num_calls += 1
516
517
    @synchronized(max_connections=1)
518
    def push(self, instance, action, success):
519
        """Adds an instance into the pool, to be reindexed on resume
520
        """
521
        if self.request is None:
522
            # This is called by a non-request script
523
            instance.reindexObject()
524
            return
525
526
        uid = api.get_uid(instance)
527
        info = self.objects.get(uid, {})
528
        info[action] = {"success": success}
529
        self.objects[uid] = info
530
531
    def succeed(self, instance, action):
532
        """Returns if the task for the instance took place successfully
533
        """
534
        uid = api.get_uid(instance)
535
        return self.objects.get(uid, {}).get(action, {}).get("success", False)
536
537
    @synchronized(max_connections=1)
538
    def resume(self):
539
        """Resumes the pool and reindex all objects processed
540
        """
541
        # do not decrease the counter below 0
542
        if self.num_calls > 0:
543
            self.num_calls -= 1
544
545
        # postpone for pending calls
546
        if self.num_calls > 0:
547
            return
548
549
        # return immediately if there are no objects in the queue
550
        count = len(self)
551
        if count == 0:
552
            return
553
554
        logger.info("Resume actions for {} objects".format(count))
555
556
        # Fetch the objects from the pool
557
        query = {"UID": self.objects.keys()}
558
        brains = api.search(query, UID_CATALOG)
559
        for brain in api.search(query, UID_CATALOG):
560
            # Reindex the object
561
            obj = api.get_object(brain)
562
            obj.reindexObject()
563
564
        # Cleanup the pool
565
        logger.info("Objects processed: {}".format(len(brains)))
566
        self.flush()
567
568
569
def push_reindex_to_actions_pool(obj):
570
    """Push a reindex job to the actions handler pool
571
    """
572
    pool = ActionHandlerPool.get_instance()
573
    pool.push(obj, "reindex", success=True)
574