Test Setup Failed
Pull Request — master (#4154)
by W
03:37
created

update_task_flow()   B

Complexity

Conditions 5

Size

Total Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 44
rs 8.0894
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import copy
19
import retrying
20
21
from orchestra import conducting
22
from orchestra.specs import loader as specs_loader
23
from orchestra import states
24
25
from st2common.constants import action as ac_const
26
from st2common.exceptions import action as ac_exc
27
from st2common.exceptions import workflow as wf_exc
28
from st2common import log as logging
29
from st2common.models.db import liveaction as lv_db_models
30
from st2common.models.db import workflow as wf_db_models
31
from st2common.persistence import liveaction as lv_db_access
32
from st2common.persistence import execution as ex_db_access
33
from st2common.persistence import workflow as wf_db_access
34
from st2common.services import action as ac_svc
35
from st2common.services import executions as ex_svc
36
from st2common.util import action_db as ac_db_util
37
from st2common.util import date as date_utils
38
from st2common.util import param as param_utils
39
40
41
LOG = logging.getLogger(__name__)
42
43
44
def request(wf_def, ac_ex_db):
45
    # Load workflow definition into workflow spec model.
46
    spec_module = specs_loader.get_spec_module('native')
47
    wf_spec = spec_module.instantiate(wf_def)
48
49
    # Inspect the workflow spec.
50
    wf_spec.inspect(raise_exception=True)
51
52
    # Identify the action to execute.
53
    action_db = ac_db_util.get_action_by_ref(ref=ac_ex_db.action['ref'])
54
55
    if not action_db:
56
        error = 'Unable to find action "%s".' % ac_ex_db.action['ref']
57
        raise ac_exc.InvalidActionReferencedException(error)
58
59
    # Identify the runner for the action.
60
    runner_type_db = ac_db_util.get_runnertype_by_name(action_db.runner_type['name'])
61
62
    # Render action execution parameters.
63
    runner_params, action_params = param_utils.render_final_params(
64
        runner_type_db.runner_parameters,
65
        action_db.parameters,
66
        ac_ex_db.parameters,
67
        ac_ex_db.context
68
    )
69
70
    # Instantiate the workflow conductor.
71
    conductor = conducting.WorkflowConductor(wf_spec, **action_params)
72
    conductor.set_workflow_state(states.REQUESTED)
73
74
    # Serialize the conductor which initializes some internal values.
75
    data = conductor.serialize()
76
77
    # Create a record for workflow execution.
78
    wf_ex_db = wf_db_models.WorkflowExecutionDB(
79
        action_execution=str(ac_ex_db.id),
80
        spec=data['spec'],
81
        graph=data['graph'],
82
        flow=data['flow'],
83
        input=data['input'],
84
        output=data['output'],
85
        errors=data['errors'],
86
        status=data['state']
87
    )
88
89
    # Insert new record into the database and publish to the message bus.
90
    wf_ex_db = wf_db_access.WorkflowExecution.insert(wf_ex_db, publish=True)
91
92
    return wf_ex_db
93
94
95
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
96
def request_pause(ac_ex_db):
97
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
98
99
    if not wf_ex_dbs:
100
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
101
102
    if len(wf_ex_dbs) > 1:
103
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
104
105
    wf_ex_db = wf_ex_dbs[0]
106
107
    if wf_ex_db.status in states.COMPLETED_STATES:
108
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
109
110
    conductor = deserialize_conductor(wf_ex_db)
111
112
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
113
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
114
115
    conductor.set_workflow_state(states.PAUSED)
116
117
    # Write the updated workflow state and task flow to the database.
118
    wf_ex_db.status = conductor.get_workflow_state()
119
    wf_ex_db.flow = conductor.flow.serialize()
120
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
121
122
    return wf_ex_db
123
124
125
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
126
def request_resume(ac_ex_db):
127
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
128
129
    if not wf_ex_dbs:
130
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
131
132
    if len(wf_ex_dbs) > 1:
133
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
134
135
    wf_ex_db = wf_ex_dbs[0]
136
137
    if wf_ex_db.status in states.COMPLETED_STATES:
138
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
139
140
    if wf_ex_db.status in states.RUNNING_STATES:
141
        raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id))
142
143
    conductor = deserialize_conductor(wf_ex_db)
144
145
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
146
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
147
148
    if conductor.get_workflow_state() in states.RUNNING_STATES:
149
        raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id))
150
151
    conductor.set_workflow_state(states.RESUMING)
152
153
    # Write the updated workflow state and task flow to the database.
154
    wf_ex_db.status = conductor.get_workflow_state()
155
    wf_ex_db.flow = conductor.flow.serialize()
156
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
157
158
    # Publish state change.
159
    wf_db_access.WorkflowExecution.publish_status(wf_ex_db)
160
161
    return wf_ex_db
162
163
164
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
165
def request_cancellation(ac_ex_db):
166
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
167
168
    if not wf_ex_dbs:
169
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
170
171
    if len(wf_ex_dbs) > 1:
172
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
173
174
    wf_ex_db = wf_ex_dbs[0]
175
176
    if wf_ex_db.status in states.COMPLETED_STATES:
177
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
178
179
    conductor = deserialize_conductor(wf_ex_db)
180
181
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
182
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
183
184
    conductor.set_workflow_state(states.CANCELED)
185
186
    # Write the updated workflow state and task flow to the database.
187
    wf_ex_db.status = conductor.get_workflow_state()
188
    wf_ex_db.flow = conductor.flow.serialize()
189
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
190
191
    # Cascade the cancellation up to the root of the workflow.
192
    root_ac_ex_db = ac_svc.get_root_execution(ac_ex_db)
193
194
    if root_ac_ex_db != ac_ex_db and root_ac_ex_db.status not in ac_const.LIVEACTION_CANCEL_STATES:
195
        root_lv_ac_db = lv_db_access.LiveAction.get(id=root_ac_ex_db.liveaction['id'])
196
        ac_svc.request_cancellation(root_lv_ac_db, None)
197
198
    return wf_ex_db
199
200
201
def request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx):
202
    # Identify the action to execute.
203
    action_db = ac_db_util.get_action_by_ref(ref=task_spec.action)
204
205
    if not action_db:
206
        error = 'Unable to find action "%s".' % task_spec.action
207
        raise ac_exc.InvalidActionReferencedException(error)
208
209
    # Identify the runner for the action.
210
    runner_type_db = ac_db_util.get_runnertype_by_name(action_db.runner_type['name'])
211
212
    # Create a record for task execution.
213
    task_ex_db = wf_db_models.TaskExecutionDB(
214
        workflow_execution=str(wf_ex_db.id),
215
        task_name=task_spec.name or task_id,
216
        task_id=task_id,
217
        task_spec=task_spec.serialize(),
218
        initial_context=task_ctx,
219
        status=states.REQUESTED
220
    )
221
222
    # Insert new record into the database.
223
    task_ex_db = wf_db_access.TaskExecution.insert(task_ex_db, publish=False)
224
225
    # Set context for the action execution.
226
    ac_ex_ctx = {
227
        'parent': st2_ctx,
228
        'orchestra': {
229
            'workflow_execution_id': str(wf_ex_db.id),
230
            'task_execution_id': str(task_ex_db.id),
231
            'task_name': task_spec.name or task_id,
232
            'task_id': task_id
233
        }
234
    }
235
236
    # Render action execution parameters and setup action execution object.
237
    ac_ex_params = param_utils.render_live_params(
238
        runner_type_db.runner_parameters or {},
239
        action_db.parameters or {},
240
        getattr(task_spec, 'input', None) or {},
241
        ac_ex_ctx
242
    )
243
244
    lv_ac_db = lv_db_models.LiveActionDB(
245
        action=task_spec.action,
246
        workflow_execution=str(wf_ex_db.id),
247
        task_execution=str(task_ex_db.id),
248
        context=ac_ex_ctx,
249
        parameters=ac_ex_params
250
    )
251
252
    # Request action execution.
253
    ac_svc.request(lv_ac_db)
254
255
    # Sst the task execution to running.
256
    task_ex_db.status = states.RUNNING
257
    task_ex_db = wf_db_access.TaskExecution.update(task_ex_db, publish=False)
258
259
    return task_ex_db
260
261
262
def handle_action_execution_pause(ac_ex_db):
263
    # Check that the action execution is paused.
264
    if ac_ex_db.status != ac_const.LIVEACTION_STATUS_PAUSED:
265
        raise Exception(
266
            'Unable to handle pause of action execution. The action execution '
267
            '"%s" is in "%s" state.' % (str(ac_ex_db.id), ac_ex_db.status)
268
        )
269
270
    # Get related record identifiers.
271
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
272
273
    # Updat task execution
274
    update_task_execution(task_ex_id, ac_ex_db.status)
275
276
    # Update task flow in the workflow execution.
277
    update_task_flow(task_ex_id, publish=False)
278
279
280
def handle_action_execution_resume(ac_ex_db):
281
    if 'orchestra' not in ac_ex_db.context:
282
        raise Exception(
283
            'Unable to handle resume of action execution. The action execution '
284
            '%s is not an orchestra workflow task.' % str(ac_ex_db.id)
285
        )
286
287
    wf_ex_id = ac_ex_db.context['orchestra']['workflow_execution_id']
288
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
289
290
    # Updat task execution to running.
291
    resume_task_execution(task_ex_id)
292
293
    # Update workflow execution to running.
294
    resume_workflow_execution(wf_ex_id, task_ex_id)
295
296
    # If action execution has a parent, cascade status change upstream and do not publish
297
    # the status change because we do not want to trigger resume of other peer subworkflows.
298
    if 'parent' in ac_ex_db.context:
299
        parent_ac_ex_id = ac_ex_db.context['parent']['execution_id']
300
        parent_ac_ex_db = ex_db_access.ActionExecution.get_by_id(parent_ac_ex_id)
301
302
        if parent_ac_ex_db.status == ac_const.LIVEACTION_STATUS_PAUSED:
303
            ac_db_util.update_liveaction_status(
304
                liveaction_id=parent_ac_ex_db.liveaction['id'],
305
                status=ac_const.LIVEACTION_STATUS_RUNNING,
306
                publish=False)
307
308
        # If there are grand parents, handle the resume of the parent action execution.
309
        if 'orchestra' in parent_ac_ex_db.context and 'parent' in parent_ac_ex_db.context:
310
            handle_action_execution_resume(parent_ac_ex_db)
311
312
313
def handle_action_execution_completion(ac_ex_db):
314
    # Check that the action execution is completed.
315
    if ac_ex_db.status not in ac_const.LIVEACTION_COMPLETED_STATES:
316
        raise Exception(
317
            'Unable to handle completion of action execution. The action execution '
318
            '"%s" is in "%s" state.' % (str(ac_ex_db.id), ac_ex_db.status)
319
        )
320
321
    # Get related record identifiers.
322
    wf_ex_id = ac_ex_db.context['orchestra']['workflow_execution_id']
323
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
324
325
    # Update task execution if completed.
326
    update_task_execution(task_ex_id, ac_ex_db.status, ac_ex_db.result)
327
328
    # Request the next set of tasks if workflow execution is not complete.
329
    request_next_tasks(task_ex_id)
330
331
    # Update workflow execution if completed.
332
    update_workflow_execution(wf_ex_id)
333
334
335
def deserialize_conductor(wf_ex_db):
336
    data = {
337
        'spec': wf_ex_db.spec,
338
        'graph': wf_ex_db.graph,
339
        'state': wf_ex_db.status,
340
        'flow': wf_ex_db.flow,
341
        'input': wf_ex_db.input,
342
        'output': wf_ex_db.output,
343
        'errors': wf_ex_db.errors
344
    }
345
346
    return conducting.WorkflowConductor.deserialize(data)
347
348
349
def refresh_conductor(wf_ex_id):
350
    wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
351
    conductor = deserialize_conductor(wf_ex_db)
352
353
    return conductor, wf_ex_db
354
355
356
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
357
def update_task_execution(task_ex_id, ac_ex_status, ac_ex_result=None):
358
    if ac_ex_status not in states.COMPLETED_STATES + [states.PAUSED]:
359
        return
360
361
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
362
    task_ex_db.status = ac_ex_status
363
    task_ex_db.result = ac_ex_result if ac_ex_result else task_ex_db.result
364
365
    if ac_ex_status in states.COMPLETED_STATES:
366
        task_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
367
368
    wf_db_access.TaskExecution.update(task_ex_db, publish=False)
369
370
371
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
372
def update_task_flow(task_ex_id, publish=True):
373
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
374
375
    # Return if task execution is not completed or paused.
376
    if task_ex_db.status not in states.COMPLETED_STATES + [states.PAUSED]:
377
        return
378
379
    # Update task flow if task execution is completed or paused.
380
    conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution)
381
    conductor.update_task_flow(task_ex_db.task_id, task_ex_db.status, result=task_ex_db.result)
382
383
    # Update timestamp and output if workflow is completed.
384
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
385
        wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
386
        wf_ex_db.output = conductor.get_workflow_output()
387
388
    # Update workflow status and task flow and write changes to database.
389
    wf_ex_db.status = conductor.get_workflow_state()
390
    wf_ex_db.errors = copy.deepcopy(conductor.errors)
391
    wf_ex_db.flow = conductor.flow.serialize()
392
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
393
394
    # Return if workflow execution is not completed or paused.
395
    if wf_ex_db.status not in states.COMPLETED_STATES + [states.PAUSED]:
396
        return
397
398
    # Update the corresponding liveaction and action execution for the workflow.
399
    wf_ac_ex_db = ex_db_access.ActionExecution.get_by_id(wf_ex_db.action_execution)
400
    wf_lv_ac_db = ac_db_util.get_liveaction_by_id(wf_ac_ex_db.liveaction['id'])
401
402
    result = {'output': wf_ex_db.output or None}
403
404
    if wf_ex_db.status in states.ABENDED_STATES:
405
        result['errors'] = wf_ex_db.errors
406
407
    wf_lv_ac_db = ac_db_util.update_liveaction_status(
408
        status=wf_ex_db.status,
409
        result=result,
410
        end_timestamp=wf_ex_db.end_timestamp,
411
        liveaction_db=wf_lv_ac_db,
412
        publish=publish)
413
414
    ex_svc.update_execution(wf_lv_ac_db, publish=publish)
415
416
417
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
418
def request_next_tasks(task_ex_id):
419
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
420
421
    # Return if task execution is not complete..
422
    if task_ex_db.status not in states.COMPLETED_STATES:
423
        return
424
425
    # Update task flow if task execution is completed.
426
    conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution)
427
    conductor.update_task_flow(task_ex_db.task_id, task_ex_db.status, result=task_ex_db.result)
428
429
    # Identify the list of next set of tasks.
430
    next_tasks = conductor.get_next_tasks(task_ex_db.task_id)
431
432
    # Mark the next tasks as running in the task flow.
433
    # The task should be marked before actual task execution.
434
    for task in next_tasks:
435
        conductor.update_task_flow(task['id'], states.RUNNING)
436
437
    # Update timestamp and output if workflow is completed.
438
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
439
        wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
440
        wf_ex_db.output = conductor.get_workflow_output()
441
442
    # Write the updated workflow state and task flow to the database.
443
    wf_ex_db.status = conductor.get_workflow_state()
444
    wf_ex_db.errors = copy.deepcopy(conductor.errors)
445
    wf_ex_db.flow = conductor.flow.serialize()
446
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
447
448
    # Request task execution for the root tasks.
449
    for task in next_tasks:
450
        st2_ctx = {'execution_id': wf_ex_db.action_execution}
451
        request_task_execution(wf_ex_db, task['id'], task['spec'], task['ctx'], st2_ctx)
452
453
454
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
455
def update_workflow_execution(wf_ex_id):
456
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
457
458
    # There is nothing to update if workflow execution is not completed or paused.
459
    if conductor.get_workflow_state() not in states.COMPLETED_STATES + [states.PAUSED]:
460
        return
461
462
    # Update timestamp and output if workflow is completed.
463
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
464
        wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
465
        wf_ex_db.output = conductor.get_workflow_output()
466
467
    # Update workflow status and task flow and write changes to database.
468
    wf_ex_db.status = conductor.get_workflow_state()
469
    wf_ex_db.errors = copy.deepcopy(conductor.errors)
470
    wf_ex_db.flow = conductor.flow.serialize()
471
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
472
473
    # Update the corresponding liveaction and action execution for the workflow.
474
    wf_ac_ex_db = ex_db_access.ActionExecution.get_by_id(wf_ex_db.action_execution)
475
    wf_lv_ac_db = ac_db_util.get_liveaction_by_id(wf_ac_ex_db.liveaction['id'])
476
477
    result = {'output': wf_ex_db.output or None}
478
479
    if wf_ex_db.status in states.ABENDED_STATES:
480
        result['errors'] = wf_ex_db.errors
481
482
    wf_lv_ac_db = ac_db_util.update_liveaction_status(
483
        status=wf_ex_db.status,
484
        result=result,
485
        end_timestamp=wf_ex_db.end_timestamp,
486
        liveaction_db=wf_lv_ac_db)
487
488
    ex_svc.update_execution(wf_lv_ac_db)
489
490
491
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
492
def resume_task_execution(task_ex_id):
493
    # Update task execution to running.
494
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
495
    task_ex_db.status = states.RUNNING
496
497
    # Write update to the database.
498
    wf_db_access.TaskExecution.update(task_ex_db, publish=False)
499
500
501
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
502
def resume_workflow_execution(wf_ex_id, task_ex_id):
503
    # Update workflow execution to running.
504
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
505
    conductor.set_workflow_state(states.RUNNING)
506
507
    # Update task execution in task flow to running.
508
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
509
    conductor.update_task_flow(task_ex_db.task_id, states.RUNNING)
510
511
    # Update workflow status and task flow and write changes to database.
512
    wf_ex_db.status = conductor.get_workflow_state()
513
    wf_ex_db.errors = copy.deepcopy(conductor.errors)
514
    wf_ex_db.flow = conductor.flow.serialize()
515
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
516