1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
# |
3
|
|
|
# This file is part of SENAITE.CORE. |
4
|
|
|
# |
5
|
|
|
# SENAITE.CORE is free software: you can redistribute it and/or modify it under |
6
|
|
|
# the terms of the GNU General Public License as published by the Free Software |
7
|
|
|
# Foundation, version 2. |
8
|
|
|
# |
9
|
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT |
10
|
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
11
|
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
12
|
|
|
# details. |
13
|
|
|
# |
14
|
|
|
# You should have received a copy of the GNU General Public License along with |
15
|
|
|
# this program; if not, write to the Free Software Foundation, Inc., 51 |
16
|
|
|
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
17
|
|
|
# |
18
|
|
|
# Copyright 2018-2025 by it's authors. |
19
|
|
|
# Some rights reserved, see README and LICENSE. |
20
|
|
|
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition |
21
|
|
|
from Products.DCWorkflow.Expression import StateChangeInfo |
22
|
|
|
from Products.DCWorkflow.Expression import createExprContext |
23
|
|
|
from Products.DCWorkflow.Guard import Guard |
24
|
|
|
from Products.DCWorkflow.States import StateDefinition |
25
|
|
|
from Products.DCWorkflow.Transitions import TransitionDefinition |
26
|
|
|
from bika.lims import api |
27
|
|
|
from bika.lims.api import _marker |
28
|
|
|
from senaite.core import logger |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
def get_workflow(thing, default=_marker): |
32
|
|
|
"""Returns the primary DCWorkflowDefinition object for the thing passed-in |
33
|
|
|
|
34
|
|
|
:param thing: A single catalog brain, content object, supermodel, workflow, |
35
|
|
|
workflow id, workflow state, workflow transition or portal type |
36
|
|
|
:type thing: DCWorkflowDefinition/StateDefinition/TransitionDefinition/ |
37
|
|
|
ATContentType/DexterityContentType/CatalogBrain/SuperModel/string |
38
|
|
|
:return: The primary workflow of the thing |
39
|
|
|
:rtype: Products.DCWorkflow.DCWorkflow.DCWorkflowDefinition |
40
|
|
|
""" |
41
|
|
|
if isinstance(thing, DCWorkflowDefinition): |
42
|
|
|
return thing |
43
|
|
|
if isinstance(thing, StateDefinition): |
44
|
|
|
return thing.getWorkflow() |
45
|
|
|
if isinstance(thing, TransitionDefinition): |
46
|
|
|
return thing.getWorkflow() |
47
|
|
|
|
48
|
|
|
if api.is_string(thing): |
49
|
|
|
# Look-up the workflow by id |
50
|
|
|
wf_tool = api.get_tool("portal_workflow") |
51
|
|
|
workflow = wf_tool.getWorkflowById(thing) |
52
|
|
|
if workflow: |
53
|
|
|
return workflow |
54
|
|
|
|
55
|
|
|
if api.is_string(thing) or api.is_object(thing): |
56
|
|
|
# Look-up the workflow by portal type or object |
57
|
|
|
wf_tool = api.get_tool("portal_workflow") |
58
|
|
|
workflows = wf_tool.getChainFor(thing) |
59
|
|
|
if len(workflows) == 1: |
60
|
|
|
return wf_tool.getWorkflowById(workflows[0]) |
61
|
|
|
if default is not _marker: |
62
|
|
|
if default is None: |
63
|
|
|
return default |
64
|
|
|
return get_workflow(default) |
65
|
|
|
if len(workflows) > 1: |
66
|
|
|
raise ValueError("More than one workflow: %s" % repr(thing)) |
67
|
|
|
raise ValueError("Workflow not found: %s" % repr(thing)) |
68
|
|
|
|
69
|
|
|
if default is not _marker: |
70
|
|
|
if default is None: |
71
|
|
|
return default |
72
|
|
|
return get_workflow(default) |
73
|
|
|
|
74
|
|
|
raise ValueError("Type is not supported: %s" % repr(type(thing))) |
75
|
|
|
|
76
|
|
|
|
77
|
|
|
def get_state(workflow, state_id, default=_marker): |
78
|
|
|
"""Returns the workflow state with the given id |
79
|
|
|
:param workflow: Workflow object or workflow id |
80
|
|
|
:type workflow: DCWorkflowDefinition/string |
81
|
|
|
:param state_id: Workflow state id |
82
|
|
|
:type state_id: string |
83
|
|
|
:return: The state object for the given workflow and id |
84
|
|
|
:rtype: Products.DCWorkflow.States.StateDefinition |
85
|
|
|
""" |
86
|
|
|
wf = get_workflow(workflow) |
87
|
|
|
state = wf.states.get(state_id) |
88
|
|
|
if state: |
89
|
|
|
return state |
90
|
|
|
if default is not _marker: |
91
|
|
|
return default |
92
|
|
|
raise ValueError("State %s not found for %s" % (state_id, wf.id)) |
93
|
|
|
|
94
|
|
|
|
95
|
|
|
def get_transition(workflow, transition_id, default=_marker): |
96
|
|
|
"""Returns the workflow transition with the given id |
97
|
|
|
:param workflow: Workflow object or workflow id |
98
|
|
|
:type workflow: DCWorkflowDefinition/string |
99
|
|
|
:param transition_id: Workflow transition id |
100
|
|
|
:type transition_id: string |
101
|
|
|
:return: The transition object for the given workflow and id |
102
|
|
|
:rtype: Products.DCWorkflow.Transitions.TransitionDefinition |
103
|
|
|
""" |
104
|
|
|
wf = get_workflow(workflow) |
105
|
|
|
transition = wf.transitions.get(transition_id) |
106
|
|
|
if transition: |
107
|
|
|
return transition |
108
|
|
|
if default is not _marker: |
109
|
|
|
return default |
110
|
|
|
raise ValueError("Transition %s not found for %s" % (transition_id, wf.id)) |
111
|
|
|
|
112
|
|
|
|
113
|
|
|
def update_workflow(workflow, states=None, transitions=None, **kwargs): |
114
|
|
|
"""Updates an existing workflow |
115
|
|
|
|
116
|
|
|
Usage:: |
117
|
|
|
|
118
|
|
|
>>> from senaite.core import permissions |
119
|
|
|
>>> from senaite.core.workflow import SAMPLE_WORKFLOW |
120
|
|
|
>>> states = { |
121
|
|
|
... "stored": { |
122
|
|
|
... "title": "Stored", |
123
|
|
|
... "description": "Sample is stored", |
124
|
|
|
... # Use tuples to overwrite existing transitions. To extend |
125
|
|
|
... # existing transitions, use a list |
126
|
|
|
... "transitions": ("recover", "detach", "dispatch", ), |
127
|
|
|
... # Copy permissions from sample_received first |
128
|
|
|
... "permissions_copy_from": "sample_received", |
129
|
|
|
... # Permissions mapping |
130
|
|
|
... "permissions": { |
131
|
|
|
... # Use tuples to overwrite existing and acquire=False. |
132
|
|
|
... # To extend existing roles, use a list |
133
|
|
|
... permissions.TransitionCancelAnalysisRequest: (), |
134
|
|
|
... permissions.TransitionReinstateAnalysisRequest: (), |
135
|
|
|
... } |
136
|
|
|
... }, |
137
|
|
|
... } |
138
|
|
|
>>> trans = { |
139
|
|
|
... "store": { |
140
|
|
|
... "title": "Store", |
141
|
|
|
... "new_state": "stored", |
142
|
|
|
... "action": "Store sample", |
143
|
|
|
... "guard": { |
144
|
|
|
... "guard_permissions": "", |
145
|
|
|
... "guard_roles": "", |
146
|
|
|
... "guard_expr": "python:here.guard_handler('store')", |
147
|
|
|
... } |
148
|
|
|
... }, |
149
|
|
|
... } |
150
|
|
|
>>> update_workflow(SAMPLE_WORKFLOW, states=states, transitions=trans) |
151
|
|
|
|
152
|
|
|
:param workflow: Workflow object or workflow id |
153
|
|
|
:type workflow: DCWorkflowDefinition/string |
154
|
|
|
:param states: states to be updated/created |
155
|
|
|
:type states: dict of {state_id:{<state_properties>}} |
156
|
|
|
:param transitions: transitions to be updated/created |
157
|
|
|
:type transitions: dict of {transition_id:<transition_properties>} |
158
|
|
|
:param title: (optional) the title of the workflow or None |
159
|
|
|
:type title: string |
160
|
|
|
:param description: (optional) the description of the workflow or None |
161
|
|
|
:type description: string |
162
|
|
|
:param initial_state: (optional) the initial status id of the workflow |
163
|
|
|
:type initial_state: string |
164
|
|
|
""" |
165
|
|
|
wf = get_workflow(workflow) |
166
|
|
|
|
167
|
|
|
# Set basic info (title, description, etc.) |
168
|
|
|
wf.title = kwargs.get("title", wf.title) |
169
|
|
|
wf.description = kwargs.get("description", wf.description) |
170
|
|
|
wf.initial_state = kwargs.get("initial_state", wf.initial_state) |
171
|
|
|
|
172
|
|
|
# Update states |
173
|
|
|
states = states or {} |
174
|
|
|
for state_id, values in states.items(): |
175
|
|
|
|
176
|
|
|
# Create the state if it does not exist yet |
177
|
|
|
state = wf.states.get(state_id) |
178
|
|
|
if not state: |
179
|
|
|
wf.states.addState(state_id) |
180
|
|
|
state = wf.states.get(state_id) |
181
|
|
|
|
182
|
|
|
# Update the state with the settings passed-in |
183
|
|
|
update_state(state, **values) |
184
|
|
|
|
185
|
|
|
# Update transitions |
186
|
|
|
transitions = transitions or {} |
187
|
|
|
for transition_id, values in transitions.items(): |
188
|
|
|
|
189
|
|
|
transition = wf.transitions.get(transition_id) |
190
|
|
|
if not transition: |
191
|
|
|
wf.transitions.addTransition(transition_id) |
192
|
|
|
transition = wf.transitions.get(transition_id) |
193
|
|
|
|
194
|
|
|
# Update the transition with the settings passed-in |
195
|
|
|
update_transition(transition, **values) |
196
|
|
|
|
197
|
|
|
|
198
|
|
|
def update_state(state, transitions=None, permissions=None, **kwargs): |
199
|
|
|
"""Updates the state of an existing workflow |
200
|
|
|
|
201
|
|
|
Note that regarding the use of tuples/lists for roles in permissions and |
202
|
|
|
in transitions, the same principles from DCWorkflow apply. This is: |
203
|
|
|
|
204
|
|
|
- Transitions passed-in as a list extend the existing ones |
205
|
|
|
- Transitions passed-in as a tuple replace the existing ones |
206
|
|
|
- Roles passed-in as a list extend the existing ones and acquire is kept |
207
|
|
|
- Roles passed-in as a tuple replace the existing ones and acquire is '0' |
208
|
|
|
|
209
|
|
|
Usage:: |
210
|
|
|
|
211
|
|
|
>>> from senaite.core import permissions |
212
|
|
|
>>> from senaite.core.workflow import SAMPLE_WORKFLOW |
213
|
|
|
|
214
|
|
|
>>> # Use tuples to overwrite existing transitions. To extend |
215
|
|
|
>>> # existing transitions, use a list |
216
|
|
|
>>> trans = ("recover", "detach", "dispatch", ) |
217
|
|
|
|
218
|
|
|
>>> # Use tuples to override existing roles per permission and to also |
219
|
|
|
>>> # set acquire to False. To extend existing roles and preserve |
220
|
|
|
>>> # acquire, use a list |
221
|
|
|
>>> perms = { |
222
|
|
|
... permissions.TransitionCancelAnalysisRequest: (), |
223
|
|
|
... permissions.TransitionReinstateAnalysisRequest: (), |
224
|
|
|
... } |
225
|
|
|
|
226
|
|
|
>>> kwargs = { |
227
|
|
|
... "title": "Stored", |
228
|
|
|
... "description": "Sample is stored", |
229
|
|
|
... # Copy permissions from sample_received first |
230
|
|
|
... "permissions_copy_from": "sample_received", |
231
|
|
|
... } |
232
|
|
|
|
233
|
|
|
>>> state = get_state(SAMPLE_WORKFLOW, "stored") |
234
|
|
|
>>> update_state(state, transitions=trans, permissions=perms, **kwargs) |
235
|
|
|
|
236
|
|
|
:param state: Workflow state definition object |
237
|
|
|
:type state: Products.DCWorkflow.States.StateDefinition |
238
|
|
|
:param transitions: Tuple or list of ids from transitions to be considered |
239
|
|
|
as exit transitions of the state. If a tuple, existing transitions are |
240
|
|
|
replaced by the new ones. If a list, existing transitions are extended |
241
|
|
|
with the new ones. If None, keeps the original transitions. |
242
|
|
|
:type transitions: list[string] |
243
|
|
|
:param permissions: dict of {permission_id:roles} where 'roles' can be a |
244
|
|
|
tuple or a list. If a tuple, existing roles are replaced by new ones |
245
|
|
|
and acquired is set to 'False'. If a list, existing roles are extended |
246
|
|
|
with the new ones and acquired is not changed. If None, keeps the |
247
|
|
|
original permissions |
248
|
|
|
:type: permissions: dict({string:tuple|list}) |
249
|
|
|
:param title: (optional) the title of the workflow or None |
250
|
|
|
:type title: string |
251
|
|
|
:param description: (optional) the description of the workflow or None |
252
|
|
|
:type description: string |
253
|
|
|
:param permissions_copy_from: (optional) the id of the status to copy the |
254
|
|
|
permissions from to the given status. When set, the permissions from |
255
|
|
|
the source status are copied to the given status before the update of |
256
|
|
|
the permissions with those passed-in takes place. |
257
|
|
|
:type: permissions_copy_from: string |
258
|
|
|
""" |
259
|
|
|
# set basic info (title, description, etc.) |
260
|
|
|
state.title = kwargs.get("title", state.title) |
261
|
|
|
state.description = kwargs.get("description", state.description) |
262
|
|
|
|
263
|
|
|
# check if we need to replace or extend existing transitions |
264
|
|
|
if transitions is None: |
265
|
|
|
transitions = state.transitions |
266
|
|
|
elif isinstance(transitions, list): |
267
|
|
|
transitions = set(transitions) |
268
|
|
|
transitions.update(state.transitions) |
269
|
|
|
transitions = tuple(transitions) |
270
|
|
|
|
271
|
|
|
# set transitions |
272
|
|
|
state.transitions = transitions |
273
|
|
|
|
274
|
|
|
# copy permissions fromm another state |
275
|
|
|
source = kwargs.get("permissions_copy_from") |
276
|
|
|
if source: |
277
|
|
|
wf = get_workflow(state) |
278
|
|
|
source = wf.states.get(source) |
279
|
|
|
copy_permissions(source, state) |
280
|
|
|
|
281
|
|
|
# update existing permissions |
282
|
|
|
permissions = permissions or {} |
283
|
|
|
for perm_id, roles in permissions.items(): |
284
|
|
|
update_permission(state, perm_id, roles) |
285
|
|
|
|
286
|
|
|
|
287
|
|
|
def update_transition(transition, **kwargs): |
288
|
|
|
"""Updates a workflow transition |
289
|
|
|
|
290
|
|
|
Usage:: |
291
|
|
|
|
292
|
|
|
>>> from senaite.core.workflow import SAMPLE_WORKFLOW |
293
|
|
|
|
294
|
|
|
>>> guard = { |
295
|
|
|
... "guard_permissions": "", |
296
|
|
|
... "guard_roles": "", |
297
|
|
|
... "guard_expr": "python:here.guard_handler('store')", |
298
|
|
|
... } |
299
|
|
|
|
300
|
|
|
>>> wf = get_workflow(SAMPLE_WORKFLOW) |
301
|
|
|
>>> transition = wf.transitions.get("store") |
302
|
|
|
>>> update_transition(transition, |
303
|
|
|
... title="Store", |
304
|
|
|
... description="The action to store the sample", |
305
|
|
|
... action="Store sample", |
306
|
|
|
... new_state="stored", |
307
|
|
|
... guard=guard) |
308
|
|
|
|
309
|
|
|
:param transition: Workflow transition definition object |
310
|
|
|
:type transition: Products.DCWorkflow.Transitions.TransitionDefinition |
311
|
|
|
:param title: (optional) the title of the transition |
312
|
|
|
:type title: string |
313
|
|
|
:param description: (optional) the descrioption of the transition |
314
|
|
|
:type title: string |
315
|
|
|
:param new_state: (optional) the state of the object after the transition |
316
|
|
|
:type new_state: string |
317
|
|
|
:param after_script: (optional) Script (Python) to run after the transition |
318
|
|
|
:type after_script: string |
319
|
|
|
:param action: (optional) the action name to display in the actions box |
320
|
|
|
:type action: string |
321
|
|
|
:param action_url: (optional) the url to use for this action. The |
322
|
|
|
%(content_url) wildcard is replaced by absolute path at runtime. If |
323
|
|
|
empty, the default `content_status_modify?workflow_action=<action_id>` |
324
|
|
|
will be used at runtime |
325
|
|
|
:type action_url: string |
326
|
|
|
:param guard: (optional) a dict with Guard properties. The supported |
327
|
|
|
properties are: 'guard_roles', 'guard_groups', 'guard_expr' and |
328
|
|
|
'guard_permissions' |
329
|
|
|
:type guard: dict |
330
|
|
|
""" |
331
|
|
|
# attrs conversions |
332
|
|
|
mapping = { |
333
|
|
|
"title": "title", |
334
|
|
|
"description": "description", |
335
|
|
|
"action": "actbox_name", |
336
|
|
|
"action_url": "actbox_url", |
337
|
|
|
"after_script": "after_script_name", |
338
|
|
|
"new_state": "new_state_id", |
339
|
|
|
} |
340
|
|
|
properties = {} |
341
|
|
|
for key, property_id in mapping.items(): |
342
|
|
|
default = getattr(transition, property_id) or "" |
343
|
|
|
value = kwargs.get(key, None) |
344
|
|
|
if value is None: |
345
|
|
|
value = default |
346
|
|
|
properties[property_id] = value |
347
|
|
|
|
348
|
|
|
# update the transition properties |
349
|
|
|
transition.setProperties(**properties) |
350
|
|
|
|
351
|
|
|
# update the guard |
352
|
|
|
guard = transition.guard or Guard() |
353
|
|
|
guard_props = kwargs.get("guard") |
354
|
|
|
if guard_props: |
355
|
|
|
guard.changeFromProperties(guard_props) |
356
|
|
|
transition.guard = guard |
357
|
|
|
|
358
|
|
|
|
359
|
|
|
def update_permission(state, permission_id, roles): |
360
|
|
|
"""Updates the permission mappings of an existing workflow state |
361
|
|
|
|
362
|
|
|
:param state: Workflow state definition object |
363
|
|
|
:type state: Products.DCWorkflow.States.StateDefinition |
364
|
|
|
:param permission_id: id of the permission |
365
|
|
|
:type permission_id: string |
366
|
|
|
:param roles: List or tuple with roles to which the given permission has |
367
|
|
|
to be granted for the workflow status. If a tuple, acquire is set to |
368
|
|
|
False and roles overwritten. Thus, an empty tuple clears all roles for |
369
|
|
|
this permission and state. If a list, the existing roles are extended |
370
|
|
|
with new ones and acquire setting is not modified. |
371
|
|
|
:type roles: list/tuple |
372
|
|
|
""" |
373
|
|
|
# resolve acquire |
374
|
|
|
if isinstance(roles, tuple): |
375
|
|
|
acquired = 0 |
376
|
|
|
else: |
377
|
|
|
info = state.getPermissionInfo(permission_id) or {} |
378
|
|
|
acquired = info.get("acquired", 1) |
379
|
|
|
roles = set(roles) |
380
|
|
|
roles.update(info.get("roles", [])) |
381
|
|
|
|
382
|
|
|
# sort them for human-friendly reading on retrieval |
383
|
|
|
roles = tuple(sorted(roles)) |
384
|
|
|
|
385
|
|
|
# add this permission to the workflow if not globally defined yet |
386
|
|
|
wf = get_workflow(state) |
387
|
|
|
if permission_id not in wf.permissions: |
388
|
|
|
wf.permissions = wf.permissions + (permission_id,) |
389
|
|
|
|
390
|
|
|
# set the permission |
391
|
|
|
logger.info("{}.{}: '{}' (acquired={}): '{}'".format( |
392
|
|
|
wf.id, state.id, permission_id, repr(acquired), ', '.join(roles))) |
393
|
|
|
state.setPermission(permission_id, acquired, roles) |
394
|
|
|
|
395
|
|
|
|
396
|
|
|
def copy_permissions(source, destination): |
397
|
|
|
"""Copies the permission mappings of a workflow state to another |
398
|
|
|
|
399
|
|
|
:param source: Workflow state definition object used as source |
400
|
|
|
:type source: Products.DCWorkflow.States.StateDefinition |
401
|
|
|
:param destination: Workflow state definition object used as destination |
402
|
|
|
:type destination: Products.DCWorkflow.States.StateDefinition |
403
|
|
|
""" |
404
|
|
|
for permission in source.permissions: |
405
|
|
|
info = source.getPermissionInfo(permission) |
406
|
|
|
roles = info.get("roles") or [] |
407
|
|
|
acquired = info.get("acquired", 1) |
408
|
|
|
# update the roles for this permission at destination |
409
|
|
|
destination.setPermission(permission, acquired, sorted(roles)) |
410
|
|
|
|
411
|
|
|
|
412
|
|
|
def is_transition_allowed(obj, transition_id): |
413
|
|
|
"""Returns whether the transition with the given id can be performed |
414
|
|
|
against the object. |
415
|
|
|
|
416
|
|
|
:param obj: object to evaluate the transition against |
417
|
|
|
:type obj: ATContentType/DexterityContentType/CatalogBrain/UID |
418
|
|
|
:param transition_id: Workflow transition id |
419
|
|
|
:type transition_id: string |
420
|
|
|
:returns: True if the transition with the given id can be performed |
421
|
|
|
:rtype: bool |
422
|
|
|
""" |
423
|
|
|
obj = api.get_object(obj) |
424
|
|
|
wf = get_workflow(obj) |
425
|
|
|
if wf.isActionSupported(obj, transition_id): |
426
|
|
|
return True |
427
|
|
|
return False |
428
|
|
|
|
429
|
|
|
|
430
|
|
|
def check_guard(obj, transition_id): |
431
|
|
|
"""Returns whether the guard's expression for the given object and |
432
|
|
|
transition evaluates to True |
433
|
|
|
|
434
|
|
|
:param obj: object to evaluate the guard against |
435
|
|
|
:type obj: ATContentType/DexterityContentType/CatalogBrain/UID |
436
|
|
|
:param transition_id: Workflow transition id |
437
|
|
|
:type transition_id: string |
438
|
|
|
:returns: True if the guard expression evaluates to True |
439
|
|
|
:rtype: bool |
440
|
|
|
""" |
441
|
|
|
obj = api.get_object(obj) |
442
|
|
|
wf = get_workflow(obj) |
443
|
|
|
|
444
|
|
|
# get the transition for the given object and workflow |
445
|
|
|
transition = get_transition(wf, transition_id) |
446
|
|
|
guard = transition.guard |
447
|
|
|
if not guard: |
448
|
|
|
return True |
449
|
|
|
|
450
|
|
|
# get the guard's TALES expression |
451
|
|
|
expr = guard.expr |
452
|
|
|
if not expr: |
453
|
|
|
return True |
454
|
|
|
|
455
|
|
|
# create the expression context to provide names for TALES expressions |
456
|
|
|
context = createExprContext(StateChangeInfo(obj, wf)) |
457
|
|
|
|
458
|
|
|
# evaluate the expression |
459
|
|
|
return True if expr(context) else False |
460
|
|
|
|