Passed
Push — 2.x ( 1cb17a...38529c )
by Ramon
07:04
created

senaite.core.api.workflow.check_guard()   A

Complexity

Conditions 4

Size

Total Lines 30
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 30
rs 9.8
c 0
b 0
f 0
cc 4
nop 2
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of SENAITE.CORE.
4
#
5
# SENAITE.CORE is free software: you can redistribute it and/or modify it under
6
# the terms of the GNU General Public License as published by the Free Software
7
# Foundation, version 2.
8
#
9
# This program is distributed in the hope that it will be useful, but WITHOUT
10
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
12
# details.
13
#
14
# You should have received a copy of the GNU General Public License along with
15
# this program; if not, write to the Free Software Foundation, Inc., 51
16
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17
#
18
# Copyright 2018-2025 by it's authors.
19
# Some rights reserved, see README and LICENSE.
20
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
21
from Products.DCWorkflow.Expression import StateChangeInfo
22
from Products.DCWorkflow.Expression import createExprContext
23
from Products.DCWorkflow.Guard import Guard
24
from Products.DCWorkflow.States import StateDefinition
25
from Products.DCWorkflow.Transitions import TransitionDefinition
26
from bika.lims import api
27
from bika.lims.api import _marker
28
from senaite.core import logger
29
30
31
def get_workflow(thing, default=_marker):
32
    """Returns the primary DCWorkflowDefinition object for the thing passed-in
33
34
    :param thing: A single catalog brain, content object, supermodel, workflow,
35
        workflow id, workflow state, workflow transition or portal type
36
    :type thing: DCWorkflowDefinition/StateDefinition/TransitionDefinition/
37
        ATContentType/DexterityContentType/CatalogBrain/SuperModel/string
38
    :return: The primary workflow of the thing
39
    :rtype: Products.DCWorkflow.DCWorkflow.DCWorkflowDefinition
40
    """
41
    if isinstance(thing, DCWorkflowDefinition):
42
        return thing
43
    if isinstance(thing, StateDefinition):
44
        return thing.getWorkflow()
45
    if isinstance(thing, TransitionDefinition):
46
        return thing.getWorkflow()
47
48
    if api.is_string(thing):
49
        # Look-up the workflow by id
50
        wf_tool = api.get_tool("portal_workflow")
51
        workflow = wf_tool.getWorkflowById(thing)
52
        if workflow:
53
            return workflow
54
55
    if api.is_string(thing) or api.is_object(thing):
56
        # Look-up the workflow by portal type or object
57
        wf_tool = api.get_tool("portal_workflow")
58
        workflows = wf_tool.getChainFor(thing)
59
        if len(workflows) == 1:
60
            return wf_tool.getWorkflowById(workflows[0])
61
        if default is not _marker:
62
            if default is None:
63
                return default
64
            return get_workflow(default)
65
        if len(workflows) > 1:
66
            raise ValueError("More than one workflow: %s" % repr(thing))
67
        raise ValueError("Workflow not found: %s" % repr(thing))
68
69
    if default is not _marker:
70
        if default is None:
71
            return default
72
        return get_workflow(default)
73
74
    raise ValueError("Type is not supported: %s" % repr(type(thing)))
75
76
77
def get_state(workflow, state_id, default=_marker):
78
    """Returns the workflow state with the given id
79
    :param workflow: Workflow object or workflow id
80
    :type workflow: DCWorkflowDefinition/string
81
    :param state_id: Workflow state id
82
    :type state_id: string
83
    :return: The state object for the given workflow and id
84
    :rtype: Products.DCWorkflow.States.StateDefinition
85
    """
86
    wf = get_workflow(workflow)
87
    state = wf.states.get(state_id)
88
    if state:
89
        return state
90
    if default is not _marker:
91
        return default
92
    raise ValueError("State %s not found for %s" % (state_id, wf.id))
93
94
95
def get_transition(workflow, transition_id, default=_marker):
96
    """Returns the workflow transition with the given id
97
    :param workflow: Workflow object or workflow id
98
    :type workflow: DCWorkflowDefinition/string
99
    :param transition_id: Workflow transition id
100
    :type transition_id: string
101
    :return: The transition object for the given workflow and id
102
    :rtype: Products.DCWorkflow.Transitions.TransitionDefinition
103
    """
104
    wf = get_workflow(workflow)
105
    transition = wf.transitions.get(transition_id)
106
    if transition:
107
        return transition
108
    if default is not _marker:
109
        return default
110
    raise ValueError("Transition %s not found for %s" % (transition_id, wf.id))
111
112
113
def update_workflow(workflow, states=None, transitions=None, **kwargs):
114
    """Updates an existing workflow
115
116
    Usage::
117
118
        >>> from senaite.core import permissions
119
        >>> from senaite.core.workflow import SAMPLE_WORKFLOW
120
        >>> states = {
121
        ...     "stored": {
122
        ...         "title": "Stored",
123
        ...         "description": "Sample is stored",
124
        ...         # Use tuples to overwrite existing transitions. To extend
125
        ...         # existing transitions, use a list
126
        ...         "transitions": ("recover", "detach", "dispatch", ),
127
        ...         # Copy permissions from sample_received first
128
        ...         "permissions_copy_from": "sample_received",
129
        ...         # Permissions mapping
130
        ...         "permissions": {
131
        ...             # Use tuples to overwrite existing and acquire=False.
132
        ...             # To extend existing roles, use a list
133
        ...             permissions.TransitionCancelAnalysisRequest: (),
134
        ...             permissions.TransitionReinstateAnalysisRequest: (),
135
        ...         }
136
        ...     },
137
        ... }
138
        >>> trans = {
139
        ...     "store": {
140
        ...         "title": "Store",
141
        ...         "new_state": "stored",
142
        ...         "action": "Store sample",
143
        ...         "guard": {
144
        ...             "guard_permissions": "",
145
        ...             "guard_roles": "",
146
        ...             "guard_expr": "python:here.guard_handler('store')",
147
        ...         }
148
        ...     },
149
        ... }
150
        >>> update_workflow(SAMPLE_WORKFLOW, states=states, transitions=trans)
151
152
    :param workflow: Workflow object or workflow id
153
    :type workflow: DCWorkflowDefinition/string
154
    :param states: states to be updated/created
155
    :type states: dict of {state_id:{<state_properties>}}
156
    :param transitions: transitions to be updated/created
157
    :type transitions: dict of {transition_id:<transition_properties>}
158
    :param title: (optional) the title of the workflow or None
159
    :type title: string
160
    :param description: (optional) the description of the workflow or None
161
    :type description: string
162
    :param initial_state: (optional) the initial status id of the workflow
163
    :type initial_state: string
164
    """
165
    wf = get_workflow(workflow)
166
167
    # Set basic info (title, description, etc.)
168
    wf.title = kwargs.get("title", wf.title)
169
    wf.description = kwargs.get("description", wf.description)
170
    wf.initial_state = kwargs.get("initial_state", wf.initial_state)
171
172
    # Update states
173
    states = states or {}
174
    for state_id, values in states.items():
175
176
        # Create the state if it does not exist yet
177
        state = wf.states.get(state_id)
178
        if not state:
179
            wf.states.addState(state_id)
180
            state = wf.states.get(state_id)
181
182
        # Update the state with the settings passed-in
183
        update_state(state, **values)
184
185
    # Update transitions
186
    transitions = transitions or {}
187
    for transition_id, values in transitions.items():
188
189
        transition = wf.transitions.get(transition_id)
190
        if not transition:
191
            wf.transitions.addTransition(transition_id)
192
            transition = wf.transitions.get(transition_id)
193
194
        # Update the transition with the settings passed-in
195
        update_transition(transition, **values)
196
197
198
def update_state(state, transitions=None, permissions=None, **kwargs):
199
    """Updates the state of an existing workflow
200
201
    Note that regarding the use of tuples/lists for roles in permissions and
202
    in transitions, the same principles from DCWorkflow apply. This is:
203
204
    - Transitions passed-in as a list extend the existing ones
205
    - Transitions passed-in as a tuple replace the existing ones
206
    - Roles passed-in as a list extend the existing ones and acquire is kept
207
    - Roles passed-in as a tuple replace the existing ones and acquire is '0'
208
209
    Usage::
210
211
        >>> from senaite.core import permissions
212
        >>> from senaite.core.workflow import SAMPLE_WORKFLOW
213
214
        >>> # Use tuples to overwrite existing transitions. To extend
215
        >>> # existing transitions, use a list
216
        >>> trans = ("recover", "detach", "dispatch", )
217
218
        >>> # Use tuples to override existing roles per permission and to also
219
        >>> # set acquire to False. To extend existing roles and preserve
220
        >>> # acquire, use a list
221
        >>> perms = {
222
        ...     permissions.TransitionCancelAnalysisRequest: (),
223
        ...     permissions.TransitionReinstateAnalysisRequest: (),
224
        ... }
225
226
        >>> kwargs = {
227
        ...     "title": "Stored",
228
        ...     "description": "Sample is stored",
229
        ...     # Copy permissions from sample_received first
230
        ...     "permissions_copy_from": "sample_received",
231
        ... }
232
233
        >>> state = get_state(SAMPLE_WORKFLOW, "stored")
234
        >>> update_state(state, transitions=trans, permissions=perms, **kwargs)
235
236
    :param state: Workflow state definition object
237
    :type state: Products.DCWorkflow.States.StateDefinition
238
    :param transitions: Tuple or list of ids from transitions to be considered
239
        as exit transitions of the state. If a tuple, existing transitions are
240
        replaced by the new ones. If a list, existing transitions are extended
241
        with the new ones. If None, keeps the original transitions.
242
    :type transitions: list[string]
243
    :param permissions: dict of {permission_id:roles} where 'roles' can be a
244
        tuple or a list. If a tuple, existing roles are replaced by new ones
245
        and acquired is set to 'False'. If a list, existing roles are extended
246
        with the new ones and acquired is not changed. If None, keeps the
247
        original permissions
248
    :type: permissions: dict({string:tuple|list})
249
    :param title: (optional) the title of the workflow or None
250
    :type title: string
251
    :param description: (optional) the description of the workflow or None
252
    :type description: string
253
    :param permissions_copy_from: (optional) the id of the status to copy the
254
        permissions from to the given status. When set, the permissions from
255
        the source status are copied to the given status before the update of
256
        the permissions with those passed-in takes place.
257
    :type: permissions_copy_from: string
258
    """
259
    # set basic info (title, description, etc.)
260
    state.title = kwargs.get("title", state.title)
261
    state.description = kwargs.get("description", state.description)
262
263
    # check if we need to replace or extend existing transitions
264
    if transitions is None:
265
        transitions = state.transitions
266
    elif isinstance(transitions, list):
267
        transitions = set(transitions)
268
        transitions.update(state.transitions)
269
        transitions = tuple(transitions)
270
271
    # set transitions
272
    state.transitions = transitions
273
274
    # copy permissions fromm another state
275
    source = kwargs.get("permissions_copy_from")
276
    if source:
277
        wf = get_workflow(state)
278
        source = wf.states.get(source)
279
        copy_permissions(source, state)
280
281
    # update existing permissions
282
    permissions = permissions or {}
283
    for perm_id, roles in permissions.items():
284
        update_permission(state, perm_id, roles)
285
286
287
def update_transition(transition, **kwargs):
288
    """Updates a workflow transition
289
290
    Usage::
291
292
        >>> from senaite.core.workflow import SAMPLE_WORKFLOW
293
294
        >>> guard = {
295
        ...     "guard_permissions": "",
296
        ...     "guard_roles": "",
297
        ...     "guard_expr": "python:here.guard_handler('store')",
298
        ... }
299
300
        >>> wf = get_workflow(SAMPLE_WORKFLOW)
301
        >>> transition = wf.transitions.get("store")
302
        >>> update_transition(transition,
303
        ...                   title="Store",
304
        ...                   description="The action to store the sample",
305
        ...                   action="Store sample",
306
        ...                   new_state="stored",
307
        ...                   guard=guard)
308
309
    :param transition: Workflow transition definition object
310
    :type transition: Products.DCWorkflow.Transitions.TransitionDefinition
311
    :param title: (optional) the title of the transition
312
    :type title: string
313
    :param description: (optional) the descrioption of the transition
314
    :type title: string
315
    :param new_state: (optional) the state of the object after the transition
316
    :type new_state: string
317
    :param after_script: (optional) Script (Python) to run after the transition
318
    :type after_script: string
319
    :param action: (optional) the action name to display in the actions box
320
    :type action: string
321
    :param action_url: (optional) the url to use for this action. The
322
        %(content_url) wildcard is replaced by absolute path at runtime. If
323
        empty, the default `content_status_modify?workflow_action=<action_id>`
324
        will be used at runtime
325
    :type action_url: string
326
    :param guard: (optional) a dict with Guard properties. The supported
327
        properties are: 'guard_roles', 'guard_groups', 'guard_expr' and
328
        'guard_permissions'
329
    :type guard: dict
330
    """
331
    # attrs conversions
332
    mapping = {
333
        "title": "title",
334
        "description": "description",
335
        "action": "actbox_name",
336
        "action_url": "actbox_url",
337
        "after_script": "after_script_name",
338
        "new_state": "new_state_id",
339
    }
340
    properties = {}
341
    for key, property_id in mapping.items():
342
        default = getattr(transition, property_id) or ""
343
        value = kwargs.get(key, None)
344
        if value is None:
345
            value = default
346
        properties[property_id] = value
347
348
    # update the transition properties
349
    transition.setProperties(**properties)
350
351
    # update the guard
352
    guard = transition.guard or Guard()
353
    guard_props = kwargs.get("guard")
354
    if guard_props:
355
        guard.changeFromProperties(guard_props)
356
    transition.guard = guard
357
358
359
def update_permission(state, permission_id, roles):
360
    """Updates the permission mappings of an existing workflow state
361
362
    :param state: Workflow state definition object
363
    :type state: Products.DCWorkflow.States.StateDefinition
364
    :param permission_id: id of the permission
365
    :type permission_id: string
366
    :param roles: List or tuple with roles to which the given permission has
367
        to be granted for the workflow status. If a tuple, acquire is set to
368
        False and roles overwritten. Thus, an empty tuple clears all roles for
369
        this permission and state. If a list, the existing roles are extended
370
        with new ones and acquire setting is not modified.
371
    :type roles: list/tuple
372
    """
373
    # resolve acquire
374
    if isinstance(roles, tuple):
375
        acquired = 0
376
    else:
377
        info = state.getPermissionInfo(permission_id) or {}
378
        acquired = info.get("acquired", 1)
379
        roles = set(roles)
380
        roles.update(info.get("roles", []))
381
382
    # sort them for human-friendly reading on retrieval
383
    roles = tuple(sorted(roles))
384
385
    # add this permission to the workflow if not globally defined yet
386
    wf = get_workflow(state)
387
    if permission_id not in wf.permissions:
388
        wf.permissions = wf.permissions + (permission_id,)
389
390
    # set the permission
391
    logger.info("{}.{}: '{}' (acquired={}): '{}'".format(
392
        wf.id, state.id, permission_id, repr(acquired), ', '.join(roles)))
393
    state.setPermission(permission_id, acquired, roles)
394
395
396
def copy_permissions(source, destination):
397
    """Copies the permission mappings of a workflow state to another
398
399
    :param source: Workflow state definition object used as source
400
    :type source: Products.DCWorkflow.States.StateDefinition
401
    :param destination: Workflow state definition object used as destination
402
    :type destination: Products.DCWorkflow.States.StateDefinition
403
    """
404
    for permission in source.permissions:
405
        info = source.getPermissionInfo(permission)
406
        roles = info.get("roles") or []
407
        acquired = info.get("acquired", 1)
408
        # update the roles for this permission at destination
409
        destination.setPermission(permission, acquired, sorted(roles))
410
411
412
def is_transition_allowed(obj, transition_id):
413
    """Returns whether the transition with the given id can be performed
414
    against the object.
415
416
    :param obj: object to evaluate the transition against
417
    :type obj: ATContentType/DexterityContentType/CatalogBrain/UID
418
    :param transition_id: Workflow transition id
419
    :type transition_id: string
420
    :returns: True if the transition with the given id can be performed
421
    :rtype: bool
422
    """
423
    obj = api.get_object(obj)
424
    wf = get_workflow(obj)
425
    if wf.isActionSupported(obj, transition_id):
426
        return True
427
    return False
428
429
430
def check_guard(obj, transition_id):
431
    """Returns whether the guard's expression for the given object and
432
    transition evaluates to True
433
434
    :param obj: object to evaluate the guard against
435
    :type obj: ATContentType/DexterityContentType/CatalogBrain/UID
436
    :param transition_id: Workflow transition id
437
    :type transition_id: string
438
    :returns: True if the guard expression evaluates to True
439
    :rtype: bool
440
    """
441
    obj = api.get_object(obj)
442
    wf = get_workflow(obj)
443
444
    # get the transition for the given object and workflow
445
    transition = get_transition(wf, transition_id)
446
    guard = transition.guard
447
    if not guard:
448
        return True
449
450
    # get the guard's TALES expression
451
    expr = guard.expr
452
    if not expr:
453
        return True
454
455
    # create the expression context to provide names for TALES expressions
456
    context = createExprContext(StateChangeInfo(obj, wf))
457
458
    # evaluate the expression
459
    return True if expr(context) else False
460