Test Setup Failed
Pull Request — master (#4154)
by W
04:37
created

request_next_tasks()   B

Complexity

Conditions 5

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
dl 0
loc 31
rs 8.0894
c 1
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import copy
19
import retrying
20
21
from orchestra import conducting
22
from orchestra.specs import loader as specs_loader
23
from orchestra import states
24
25
from st2common.constants import action as ac_const
26
from st2common.exceptions import action as ac_exc
27
from st2common.exceptions import workflow as wf_exc
28
from st2common import log as logging
29
from st2common.models.db import liveaction as lv_db_models
30
from st2common.models.db import workflow as wf_db_models
31
from st2common.persistence import liveaction as lv_db_access
32
from st2common.persistence import execution as ex_db_access
33
from st2common.persistence import workflow as wf_db_access
34
from st2common.services import action as ac_svc
35
from st2common.services import executions as ex_svc
36
from st2common.util import action_db as ac_db_util
37
from st2common.util import date as date_utils
38
from st2common.util import param as param_utils
39
40
41
LOG = logging.getLogger(__name__)
42
43
44
def request(wf_def, ac_ex_db):
45
    # Load workflow definition into workflow spec model.
46
    spec_module = specs_loader.get_spec_module('native')
47
    wf_spec = spec_module.instantiate(wf_def)
48
49
    # Inspect the workflow spec.
50
    wf_spec.inspect(raise_exception=True)
51
52
    # Identify the action to execute.
53
    action_db = ac_db_util.get_action_by_ref(ref=ac_ex_db.action['ref'])
54
55
    if not action_db:
56
        error = 'Unable to find action "%s".' % ac_ex_db.action['ref']
57
        raise ac_exc.InvalidActionReferencedException(error)
58
59
    # Identify the runner for the action.
60
    runner_type_db = ac_db_util.get_runnertype_by_name(action_db.runner_type['name'])
61
62
    # Render action execution parameters.
63
    runner_params, action_params = param_utils.render_final_params(
64
        runner_type_db.runner_parameters,
65
        action_db.parameters,
66
        ac_ex_db.parameters,
67
        ac_ex_db.context
68
    )
69
70
    # Instantiate the workflow conductor.
71
    conductor = conducting.WorkflowConductor(wf_spec, **action_params)
72
    conductor.set_workflow_state(states.REQUESTED)
73
74
    # Serialize the conductor which initializes some internal values.
75
    data = conductor.serialize()
76
77
    # Create a record for workflow execution.
78
    wf_ex_db = wf_db_models.WorkflowExecutionDB(
79
        action_execution=str(ac_ex_db.id),
80
        spec=data['spec'],
81
        graph=data['graph'],
82
        flow=data['flow'],
83
        input=data['input'],
84
        output=data['output'],
85
        errors=data['errors'],
86
        status=data['state']
87
    )
88
89
    # Insert new record into the database and publish to the message bus.
90
    wf_ex_db = wf_db_access.WorkflowExecution.insert(wf_ex_db, publish=True)
91
92
    return wf_ex_db
93
94
95
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
96
def request_pause(ac_ex_db):
97
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
98
99
    if not wf_ex_dbs:
100
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
101
102
    if len(wf_ex_dbs) > 1:
103
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
104
105
    wf_ex_db = wf_ex_dbs[0]
106
107
    if wf_ex_db.status in states.COMPLETED_STATES:
108
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
109
110
    conductor = deserialize_conductor(wf_ex_db)
111
112
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
113
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
114
115
    conductor.set_workflow_state(states.PAUSED)
116
117
    # Write the updated workflow state and task flow to the database.
118
    wf_ex_db.status = conductor.get_workflow_state()
119
    wf_ex_db.flow = conductor.flow.serialize()
120
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
121
122
    return wf_ex_db
123
124
125
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
126
def request_resume(ac_ex_db):
127
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
128
129
    if not wf_ex_dbs:
130
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
131
132
    if len(wf_ex_dbs) > 1:
133
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
134
135
    wf_ex_db = wf_ex_dbs[0]
136
137
    if wf_ex_db.status in states.COMPLETED_STATES:
138
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
139
140
    if wf_ex_db.status in states.RUNNING_STATES:
141
        raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id))
142
143
    conductor = deserialize_conductor(wf_ex_db)
144
145
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
146
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
147
148
    if conductor.get_workflow_state() in states.RUNNING_STATES:
149
        raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id))
150
151
    conductor.set_workflow_state(states.RESUMING)
152
153
    # Write the updated workflow state and task flow to the database.
154
    wf_ex_db.status = conductor.get_workflow_state()
155
    wf_ex_db.flow = conductor.flow.serialize()
156
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
157
158
    # Publish state change.
159
    wf_db_access.WorkflowExecution.publish_status(wf_ex_db)
160
161
    return wf_ex_db
162
163
164
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
165
def request_cancellation(ac_ex_db):
166
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
167
168
    if not wf_ex_dbs:
169
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
170
171
    if len(wf_ex_dbs) > 1:
172
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
173
174
    wf_ex_db = wf_ex_dbs[0]
175
176
    if wf_ex_db.status in states.COMPLETED_STATES:
177
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
178
179
    conductor = deserialize_conductor(wf_ex_db)
180
181
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
182
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
183
184
    conductor.set_workflow_state(states.CANCELED)
185
186
    # Write the updated workflow state and task flow to the database.
187
    wf_ex_db.status = conductor.get_workflow_state()
188
    wf_ex_db.flow = conductor.flow.serialize()
189
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
190
191
    # Cascade the cancellation up to the root of the workflow.
192
    root_ac_ex_db = ac_svc.get_root_execution(ac_ex_db)
193
194
    if root_ac_ex_db != ac_ex_db and root_ac_ex_db.status not in ac_const.LIVEACTION_CANCEL_STATES:
195
        root_lv_ac_db = lv_db_access.LiveAction.get(id=root_ac_ex_db.liveaction['id'])
196
        ac_svc.request_cancellation(root_lv_ac_db, None)
197
198
    return wf_ex_db
199
200
201
def request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx):
202
    # Identify the action to execute.
203
    action_db = ac_db_util.get_action_by_ref(ref=task_spec.action)
204
205
    if not action_db:
206
        error = 'Unable to find action "%s".' % task_spec.action
207
        raise ac_exc.InvalidActionReferencedException(error)
208
209
    # Identify the runner for the action.
210
    runner_type_db = ac_db_util.get_runnertype_by_name(action_db.runner_type['name'])
211
212
    # Create a record for task execution.
213
    task_ex_db = wf_db_models.TaskExecutionDB(
214
        workflow_execution=str(wf_ex_db.id),
215
        task_name=task_spec.name or task_id,
216
        task_id=task_id,
217
        task_spec=task_spec.serialize(),
218
        initial_context=task_ctx,
219
        status=states.REQUESTED
220
    )
221
222
    # Insert new record into the database.
223
    task_ex_db = wf_db_access.TaskExecution.insert(task_ex_db, publish=False)
224
225
    try:
226
        # Set context for the action execution.
227
        ac_ex_ctx = {
228
            'parent': st2_ctx,
229
            'orchestra': {
230
                'workflow_execution_id': str(wf_ex_db.id),
231
                'task_execution_id': str(task_ex_db.id),
232
                'task_name': task_spec.name or task_id,
233
                'task_id': task_id
234
            }
235
        }
236
237
        # Render action execution parameters and setup action execution object.
238
        ac_ex_params = param_utils.render_live_params(
239
            runner_type_db.runner_parameters or {},
240
            action_db.parameters or {},
241
            getattr(task_spec, 'input', None) or {},
242
            ac_ex_ctx
243
        )
244
245
        lv_ac_db = lv_db_models.LiveActionDB(
246
            action=task_spec.action,
247
            workflow_execution=str(wf_ex_db.id),
248
            task_execution=str(task_ex_db.id),
249
            context=ac_ex_ctx,
250
            parameters=ac_ex_params
251
        )
252
253
        # Request action execution.
254
        ac_svc.request(lv_ac_db)
255
256
        # Sst the task execution to running.
257
        task_ex_db.status = states.RUNNING
258
        task_ex_db = wf_db_access.TaskExecution.update(task_ex_db, publish=False)
259
    except Exception as e:
260
        result = {'errors': [{'message': str(e), 'task_id': task_ex_db.task_id}]}
261
        update_task_execution(str(task_ex_db.id), states.FAILED, result)
262
        raise e
263
264
    return task_ex_db
265
266
267
def handle_action_execution_pause(ac_ex_db):
268
    # Check that the action execution is paused.
269
    if ac_ex_db.status != ac_const.LIVEACTION_STATUS_PAUSED:
270
        raise Exception(
271
            'Unable to handle pause of action execution. The action execution '
272
            '"%s" is in "%s" state.' % (str(ac_ex_db.id), ac_ex_db.status)
273
        )
274
275
    # Get related record identifiers.
276
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
277
278
    # Updat task execution
279
    update_task_execution(task_ex_id, ac_ex_db.status)
280
281
    # Update task flow in the workflow execution.
282
    update_task_flow(task_ex_id, publish=False)
283
284
285
def handle_action_execution_resume(ac_ex_db):
286
    if 'orchestra' not in ac_ex_db.context:
287
        raise Exception(
288
            'Unable to handle resume of action execution. The action execution '
289
            '%s is not an orchestra workflow task.' % str(ac_ex_db.id)
290
        )
291
292
    wf_ex_id = ac_ex_db.context['orchestra']['workflow_execution_id']
293
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
294
295
    # Updat task execution to running.
296
    resume_task_execution(task_ex_id)
297
298
    # Update workflow execution to running.
299
    resume_workflow_execution(wf_ex_id, task_ex_id)
300
301
    # If action execution has a parent, cascade status change upstream and do not publish
302
    # the status change because we do not want to trigger resume of other peer subworkflows.
303
    if 'parent' in ac_ex_db.context:
304
        parent_ac_ex_id = ac_ex_db.context['parent']['execution_id']
305
        parent_ac_ex_db = ex_db_access.ActionExecution.get_by_id(parent_ac_ex_id)
306
307
        if parent_ac_ex_db.status == ac_const.LIVEACTION_STATUS_PAUSED:
308
            ac_db_util.update_liveaction_status(
309
                liveaction_id=parent_ac_ex_db.liveaction['id'],
310
                status=ac_const.LIVEACTION_STATUS_RUNNING,
311
                publish=False)
312
313
        # If there are grand parents, handle the resume of the parent action execution.
314
        if 'orchestra' in parent_ac_ex_db.context and 'parent' in parent_ac_ex_db.context:
315
            handle_action_execution_resume(parent_ac_ex_db)
316
317
318
def handle_action_execution_completion(ac_ex_db):
319
    # Check that the action execution is completed.
320
    if ac_ex_db.status not in ac_const.LIVEACTION_COMPLETED_STATES:
321
        raise Exception(
322
            'Unable to handle completion of action execution. The action execution '
323
            '"%s" is in "%s" state.' % (str(ac_ex_db.id), ac_ex_db.status)
324
        )
325
326
    # Get related record identifiers.
327
    wf_ex_id = ac_ex_db.context['orchestra']['workflow_execution_id']
328
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
329
330
    # Update task execution if completed.
331
    update_task_execution(task_ex_id, ac_ex_db.status, ac_ex_db.result)
332
333
    # Request the next set of tasks if workflow execution is not complete.
334
    request_next_tasks(task_ex_id)
335
336
    # Update workflow execution if completed.
337
    update_workflow_execution(wf_ex_id)
338
339
340
def deserialize_conductor(wf_ex_db):
341
    data = {
342
        'spec': wf_ex_db.spec,
343
        'graph': wf_ex_db.graph,
344
        'state': wf_ex_db.status,
345
        'flow': wf_ex_db.flow,
346
        'input': wf_ex_db.input,
347
        'output': wf_ex_db.output,
348
        'errors': wf_ex_db.errors
349
    }
350
351
    return conducting.WorkflowConductor.deserialize(data)
352
353
354
def refresh_conductor(wf_ex_id):
355
    wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
356
    conductor = deserialize_conductor(wf_ex_db)
357
358
    return conductor, wf_ex_db
359
360
361
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
362
def update_task_flow(task_ex_id, publish=True):
363
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
364
365
    # Return if task execution is not completed or paused.
366
    if task_ex_db.status not in states.COMPLETED_STATES + [states.PAUSED]:
367
        return
368
369
    # Update task flow if task execution is completed or paused.
370
    conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution)
371
    conductor.update_task_flow(task_ex_db.task_id, task_ex_db.status, result=task_ex_db.result)
372
373
    # Update workflow execution and related liveaction and action execution.
374
    update_execution_records(
375
        wf_ex_db,
376
        conductor,
377
        update_lv_ac_on_states=(states.COMPLETED_STATES + [states.PAUSED]),
378
        pub_lv_ac=publish,
379
        pub_ac_ex=publish
380
    )
381
382
383
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
384
def request_next_tasks(task_ex_id):
385
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
386
387
    # Return if task execution is not complete..
388
    if task_ex_db.status not in states.COMPLETED_STATES:
389
        return
390
391
    # Update task flow if task execution is completed.
392
    conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution)
393
    conductor.update_task_flow(task_ex_db.task_id, task_ex_db.status, result=task_ex_db.result)
394
395
    # Identify the list of next set of tasks.
396
    next_tasks = conductor.get_next_tasks(task_ex_db.task_id)
397
398
    # Mark the next tasks as running in the task flow.
399
    # The task should be marked before actual task execution.
400
    for task in next_tasks:
401
        conductor.update_task_flow(task['id'], states.RUNNING)
402
403
    # Update workflow execution and related liveaction and action execution.
404
    update_execution_records(wf_ex_db, conductor)
405
406
    # Request task execution for the root tasks.
407
    for task in next_tasks:
408
        try:
409
            st2_ctx = {'execution_id': wf_ex_db.action_execution}
410
            request_task_execution(wf_ex_db, task['id'], task['spec'], task['ctx'], st2_ctx)
411
        except Exception as e:
412
            fail_workflow_execution(str(wf_ex_db.id), e, task_id=task['id'])
413
            break
414
415
416
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
417
def update_task_execution(task_ex_id, ac_ex_status, ac_ex_result=None):
418
    if ac_ex_status not in states.COMPLETED_STATES + [states.PAUSED]:
419
        return
420
421
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
422
    task_ex_db.status = ac_ex_status
423
    task_ex_db.result = ac_ex_result if ac_ex_result else task_ex_db.result
424
425
    if ac_ex_status in states.COMPLETED_STATES:
426
        task_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
427
428
    wf_db_access.TaskExecution.update(task_ex_db, publish=False)
429
430
431
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
432
def resume_task_execution(task_ex_id):
433
    # Update task execution to running.
434
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
435
    task_ex_db.status = states.RUNNING
436
437
    # Write update to the database.
438
    wf_db_access.TaskExecution.update(task_ex_db, publish=False)
439
440
441
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
442
def update_workflow_execution(wf_ex_id):
443
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
444
445
    # There is nothing to update if workflow execution is not completed or paused.
446
    if conductor.get_workflow_state() in states.COMPLETED_STATES + [states.PAUSED]:
447
        # Update workflow execution and related liveaction and action execution.
448
        update_execution_records(wf_ex_db, conductor)
449
450
451
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
452
def resume_workflow_execution(wf_ex_id, task_ex_id):
453
    # Update workflow execution to running.
454
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
455
    conductor.set_workflow_state(states.RUNNING)
456
457
    # Update task execution in task flow to running.
458
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
459
    conductor.update_task_flow(task_ex_db.task_id, states.RUNNING)
460
461
    # Update workflow execution and related liveaction and action execution.
462
    update_execution_records(wf_ex_db, conductor)
463
464
465
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
466
def fail_workflow_execution(wf_ex_id, exception, task_id=None):
467
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
468
469
    # Set workflow execution status to failed and record error.
470
    conductor.set_workflow_state(states.FAILED)
471
    conductor.log_error(str(exception), task_id=task_id)
472
473
    # Update workflow execution and related liveaction and action execution.
474
    update_execution_records(wf_ex_db, conductor)
475
476
477
def update_execution_records(wf_ex_db, conductor, update_lv_ac_on_states=None,
478
                             pub_wf_ex=False, pub_lv_ac=True, pub_ac_ex=True):
479
480
    # Update timestamp and output if workflow is completed.
481
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
482
        wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
483
        wf_ex_db.output = conductor.get_workflow_output()
484
485
    # Update workflow status and task flow and write changes to database.
486
    wf_ex_db.status = conductor.get_workflow_state()
487
    wf_ex_db.errors = copy.deepcopy(conductor.errors)
488
    wf_ex_db.flow = conductor.flow.serialize()
489
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=pub_wf_ex)
490
491
    # Return if workflow execution status is not specified in update_lv_ac_on_states.
492
    if isinstance(update_lv_ac_on_states, list) and wf_ex_db.status not in update_lv_ac_on_states:
493
        return
494
495
    # Update the corresponding liveaction and action execution for the workflow.
496
    wf_ac_ex_db = ex_db_access.ActionExecution.get_by_id(wf_ex_db.action_execution)
497
    wf_lv_ac_db = ac_db_util.get_liveaction_by_id(wf_ac_ex_db.liveaction['id'])
498
499
    # Gather result for liveaction and action execution.
500
    result = {'output': wf_ex_db.output or None}
501
502
    if wf_ex_db.status in states.ABENDED_STATES:
503
        result['errors'] = wf_ex_db.errors
504
505
    # Sync update with corresponding liveaction and action execution.
506
    wf_lv_ac_db = ac_db_util.update_liveaction_status(
507
        status=wf_ex_db.status,
508
        result=result,
509
        end_timestamp=wf_ex_db.end_timestamp,
510
        liveaction_db=wf_lv_ac_db,
511
        publish=pub_lv_ac)
512
513
    ex_svc.update_execution(wf_lv_ac_db, publish=pub_ac_ex)
514