Test Setup Failed
Pull Request — master (#4154)
by W
04:10
created

update_execution_records()   B

Complexity

Conditions 5

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
dl 0
loc 37
rs 8.0894
c 1
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import copy
19
import retrying
20
21
from orchestra import conducting
22
from orchestra.specs import loader as specs_loader
23
from orchestra import states
24
25
from st2common.constants import action as ac_const
26
from st2common.exceptions import action as ac_exc
27
from st2common.exceptions import workflow as wf_exc
28
from st2common import log as logging
29
from st2common.models.db import liveaction as lv_db_models
30
from st2common.models.db import workflow as wf_db_models
31
from st2common.persistence import liveaction as lv_db_access
32
from st2common.persistence import execution as ex_db_access
33
from st2common.persistence import workflow as wf_db_access
34
from st2common.services import action as ac_svc
35
from st2common.services import executions as ex_svc
36
from st2common.util import action_db as ac_db_util
37
from st2common.util import date as date_utils
38
from st2common.util import param as param_utils
39
40
41
LOG = logging.getLogger(__name__)
42
43
44
def request(wf_def, ac_ex_db):
45
    # Load workflow definition into workflow spec model.
46
    spec_module = specs_loader.get_spec_module('native')
47
    wf_spec = spec_module.instantiate(wf_def)
48
49
    # Inspect the workflow spec.
50
    wf_spec.inspect(raise_exception=True)
51
52
    # Identify the action to execute.
53
    action_db = ac_db_util.get_action_by_ref(ref=ac_ex_db.action['ref'])
54
55
    if not action_db:
56
        error = 'Unable to find action "%s".' % ac_ex_db.action['ref']
57
        raise ac_exc.InvalidActionReferencedException(error)
58
59
    # Identify the runner for the action.
60
    runner_type_db = ac_db_util.get_runnertype_by_name(action_db.runner_type['name'])
61
62
    # Render action execution parameters.
63
    runner_params, action_params = param_utils.render_final_params(
64
        runner_type_db.runner_parameters,
65
        action_db.parameters,
66
        ac_ex_db.parameters,
67
        ac_ex_db.context
68
    )
69
70
    # Instantiate the workflow conductor.
71
    conductor = conducting.WorkflowConductor(wf_spec, **action_params)
72
    conductor.set_workflow_state(states.REQUESTED)
73
74
    # Serialize the conductor which initializes some internal values.
75
    data = conductor.serialize()
76
77
    # Create a record for workflow execution.
78
    wf_ex_db = wf_db_models.WorkflowExecutionDB(
79
        action_execution=str(ac_ex_db.id),
80
        spec=data['spec'],
81
        graph=data['graph'],
82
        flow=data['flow'],
83
        input=data['input'],
84
        output=data['output'],
85
        errors=data['errors'],
86
        status=data['state']
87
    )
88
89
    # Insert new record into the database and publish to the message bus.
90
    wf_ex_db = wf_db_access.WorkflowExecution.insert(wf_ex_db, publish=True)
91
92
    return wf_ex_db
93
94
95
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
96
def request_pause(ac_ex_db):
97
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
98
99
    if not wf_ex_dbs:
100
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
101
102
    if len(wf_ex_dbs) > 1:
103
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
104
105
    wf_ex_db = wf_ex_dbs[0]
106
107
    if wf_ex_db.status in states.COMPLETED_STATES:
108
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
109
110
    conductor = deserialize_conductor(wf_ex_db)
111
112
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
113
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
114
115
    conductor.set_workflow_state(states.PAUSED)
116
117
    # Write the updated workflow state and task flow to the database.
118
    wf_ex_db.status = conductor.get_workflow_state()
119
    wf_ex_db.flow = conductor.flow.serialize()
120
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
121
122
    return wf_ex_db
123
124
125
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
126
def request_resume(ac_ex_db):
127
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
128
129
    if not wf_ex_dbs:
130
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
131
132
    if len(wf_ex_dbs) > 1:
133
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
134
135
    wf_ex_db = wf_ex_dbs[0]
136
137
    if wf_ex_db.status in states.COMPLETED_STATES:
138
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
139
140
    if wf_ex_db.status in states.RUNNING_STATES:
141
        raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id))
142
143
    conductor = deserialize_conductor(wf_ex_db)
144
145
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
146
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
147
148
    if conductor.get_workflow_state() in states.RUNNING_STATES:
149
        raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id))
150
151
    conductor.set_workflow_state(states.RESUMING)
152
153
    # Write the updated workflow state and task flow to the database.
154
    wf_ex_db.status = conductor.get_workflow_state()
155
    wf_ex_db.flow = conductor.flow.serialize()
156
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
157
158
    # Publish state change.
159
    wf_db_access.WorkflowExecution.publish_status(wf_ex_db)
160
161
    return wf_ex_db
162
163
164
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
165
def request_cancellation(ac_ex_db):
166
    wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
167
168
    if not wf_ex_dbs:
169
        raise wf_exc.WorkflowExecutionNotFoundException(str(ac_ex_db.id))
170
171
    if len(wf_ex_dbs) > 1:
172
        raise wf_exc.AmbiguousWorkflowExecutionException(str(ac_ex_db.id))
173
174
    wf_ex_db = wf_ex_dbs[0]
175
176
    if wf_ex_db.status in states.COMPLETED_STATES:
177
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
178
179
    conductor = deserialize_conductor(wf_ex_db)
180
181
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
182
        raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id))
183
184
    conductor.set_workflow_state(states.CANCELED)
185
186
    # Write the updated workflow state and task flow to the database.
187
    wf_ex_db.status = conductor.get_workflow_state()
188
    wf_ex_db.flow = conductor.flow.serialize()
189
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=False)
190
191
    # Cascade the cancellation up to the root of the workflow.
192
    root_ac_ex_db = ac_svc.get_root_execution(ac_ex_db)
193
194
    if root_ac_ex_db != ac_ex_db and root_ac_ex_db.status not in ac_const.LIVEACTION_CANCEL_STATES:
195
        root_lv_ac_db = lv_db_access.LiveAction.get(id=root_ac_ex_db.liveaction['id'])
196
        ac_svc.request_cancellation(root_lv_ac_db, None)
197
198
    return wf_ex_db
199
200
201
def request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx):
202
    # Create a record for task execution.
203
    task_ex_db = wf_db_models.TaskExecutionDB(
204
        workflow_execution=str(wf_ex_db.id),
205
        task_name=task_spec.name or task_id,
206
        task_id=task_id,
207
        task_spec=task_spec.serialize(),
208
        initial_context=task_ctx,
209
        status=states.REQUESTED
210
    )
211
212
    # Insert new record into the database.
213
    task_ex_db = wf_db_access.TaskExecution.insert(task_ex_db, publish=False)
214
215
    try:
216
        # Return here if no action is specified in task spec.
217
        if task_spec.action is None:
218
            # Set the task execution to running.
219
            task_ex_db.status = states.RUNNING
220
            task_ex_db = wf_db_access.TaskExecution.update(task_ex_db, publish=False)
221
222
            # Fast forward task execution to completion.
223
            update_task_execution(str(task_ex_db.id), states.SUCCEEDED)
224
            update_task_flow(str(task_ex_db.id), publish=False)
225
226
            # Refresh and return the task execution
227
            return wf_db_access.TaskExecution.get_by_id(str(task_ex_db.id))
228
229
        # Identify the action to execute.
230
        action_db = ac_db_util.get_action_by_ref(ref=task_spec.action)
231
232
        if not action_db:
233
            error = 'Unable to find action "%s".' % task_spec.action
234
            raise ac_exc.InvalidActionReferencedException(error)
235
236
        # Identify the runner for the action.
237
        runner_type_db = ac_db_util.get_runnertype_by_name(action_db.runner_type['name'])
238
239
        # Set context for the action execution.
240
        ac_ex_ctx = {
241
            'parent': st2_ctx,
242
            'orchestra': {
243
                'workflow_execution_id': str(wf_ex_db.id),
244
                'task_execution_id': str(task_ex_db.id),
245
                'task_name': task_spec.name or task_id,
246
                'task_id': task_id
247
            }
248
        }
249
250
        # Render action execution parameters and setup action execution object.
251
        ac_ex_params = param_utils.render_live_params(
252
            runner_type_db.runner_parameters or {},
253
            action_db.parameters or {},
254
            getattr(task_spec, 'input', None) or {},
255
            ac_ex_ctx
256
        )
257
258
        lv_ac_db = lv_db_models.LiveActionDB(
259
            action=task_spec.action,
260
            workflow_execution=str(wf_ex_db.id),
261
            task_execution=str(task_ex_db.id),
262
            context=ac_ex_ctx,
263
            parameters=ac_ex_params
264
        )
265
266
        # Request action execution.
267
        ac_svc.request(lv_ac_db)
268
269
        # Set the task execution to running.
270
        task_ex_db.status = states.RUNNING
271
        task_ex_db = wf_db_access.TaskExecution.update(task_ex_db, publish=False)
272
    except Exception as e:
273
        result = {'errors': [{'message': str(e), 'task_id': task_ex_db.task_id}]}
274
        update_task_execution(str(task_ex_db.id), states.FAILED, result)
275
        raise e
276
277
    return task_ex_db
278
279
280
def handle_action_execution_pause(ac_ex_db):
281
    # Check that the action execution is paused.
282
    if ac_ex_db.status != ac_const.LIVEACTION_STATUS_PAUSED:
283
        raise Exception(
284
            'Unable to handle pause of action execution. The action execution '
285
            '"%s" is in "%s" state.' % (str(ac_ex_db.id), ac_ex_db.status)
286
        )
287
288
    # Get related record identifiers.
289
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
290
291
    # Updat task execution
292
    update_task_execution(task_ex_id, ac_ex_db.status)
293
294
    # Update task flow in the workflow execution.
295
    update_task_flow(task_ex_id, publish=False)
296
297
298
def handle_action_execution_resume(ac_ex_db):
299
    if 'orchestra' not in ac_ex_db.context:
300
        raise Exception(
301
            'Unable to handle resume of action execution. The action execution '
302
            '%s is not an orchestra workflow task.' % str(ac_ex_db.id)
303
        )
304
305
    wf_ex_id = ac_ex_db.context['orchestra']['workflow_execution_id']
306
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
307
308
    # Updat task execution to running.
309
    resume_task_execution(task_ex_id)
310
311
    # Update workflow execution to running.
312
    resume_workflow_execution(wf_ex_id, task_ex_id)
313
314
    # If action execution has a parent, cascade status change upstream and do not publish
315
    # the status change because we do not want to trigger resume of other peer subworkflows.
316
    if 'parent' in ac_ex_db.context:
317
        parent_ac_ex_id = ac_ex_db.context['parent']['execution_id']
318
        parent_ac_ex_db = ex_db_access.ActionExecution.get_by_id(parent_ac_ex_id)
319
320
        if parent_ac_ex_db.status == ac_const.LIVEACTION_STATUS_PAUSED:
321
            ac_db_util.update_liveaction_status(
322
                liveaction_id=parent_ac_ex_db.liveaction['id'],
323
                status=ac_const.LIVEACTION_STATUS_RUNNING,
324
                publish=False)
325
326
        # If there are grand parents, handle the resume of the parent action execution.
327
        if 'orchestra' in parent_ac_ex_db.context and 'parent' in parent_ac_ex_db.context:
328
            handle_action_execution_resume(parent_ac_ex_db)
329
330
331
def handle_action_execution_completion(ac_ex_db):
332
    # Check that the action execution is completed.
333
    if ac_ex_db.status not in ac_const.LIVEACTION_COMPLETED_STATES:
334
        raise Exception(
335
            'Unable to handle completion of action execution. The action execution '
336
            '"%s" is in "%s" state.' % (str(ac_ex_db.id), ac_ex_db.status)
337
        )
338
339
    # Get related record identifiers.
340
    wf_ex_id = ac_ex_db.context['orchestra']['workflow_execution_id']
341
    task_ex_id = ac_ex_db.context['orchestra']['task_execution_id']
342
343
    # Update task execution if completed.
344
    update_task_execution(task_ex_id, ac_ex_db.status, ac_ex_db.result)
345
346
    # Request the next set of tasks if workflow execution is not complete.
347
    request_next_tasks(task_ex_id)
348
349
    # Update workflow execution if completed.
350
    update_workflow_execution(wf_ex_id)
351
352
353
def deserialize_conductor(wf_ex_db):
354
    data = {
355
        'spec': wf_ex_db.spec,
356
        'graph': wf_ex_db.graph,
357
        'state': wf_ex_db.status,
358
        'flow': wf_ex_db.flow,
359
        'input': wf_ex_db.input,
360
        'output': wf_ex_db.output,
361
        'errors': wf_ex_db.errors
362
    }
363
364
    return conducting.WorkflowConductor.deserialize(data)
365
366
367
def refresh_conductor(wf_ex_id):
368
    wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_id)
369
    conductor = deserialize_conductor(wf_ex_db)
370
371
    return conductor, wf_ex_db
372
373
374
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
375
def update_task_flow(task_ex_id, publish=True):
376
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
377
378
    # Return if task execution is not completed or paused.
379
    if task_ex_db.status not in states.COMPLETED_STATES + [states.PAUSED]:
380
        return
381
382
    # Update task flow if task execution is completed or paused.
383
    conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution)
384
    conductor.update_task_flow(task_ex_db.task_id, task_ex_db.status, result=task_ex_db.result)
385
386
    # Update workflow execution and related liveaction and action execution.
387
    update_execution_records(
388
        wf_ex_db,
389
        conductor,
390
        update_lv_ac_on_states=(states.COMPLETED_STATES + [states.PAUSED]),
391
        pub_lv_ac=publish,
392
        pub_ac_ex=publish
393
    )
394
395
396
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
397
def request_next_tasks(task_ex_id):
398
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
399
400
    # Return if task execution is not complete..
401
    if task_ex_db.status not in states.COMPLETED_STATES:
402
        return
403
404
    # Update task flow if task execution is completed.
405
    conductor, wf_ex_db = refresh_conductor(task_ex_db.workflow_execution)
406
    conductor.update_task_flow(task_ex_db.task_id, task_ex_db.status, result=task_ex_db.result)
407
408
    # Identify the list of next set of tasks.
409
    next_tasks = conductor.get_next_tasks(task_ex_db.task_id)
410
411
    # If there is no new tasks, update execution records to handle possible completion.
412
    if not next_tasks:
413
        # Update workflow execution and related liveaction and action execution.
414
        update_execution_records(wf_ex_db, conductor)
415
416
    # Iterate while there are next tasks identified for processing. In the case for
417
    # task with no action execution defined, the task execution will complete
418
    # immediately with a new set of tasks available.
419
    while next_tasks:
420
        # Mark the tasks as running in the task flow before actual task execution.
421
        for task in next_tasks:
422
            conductor.update_task_flow(task['id'], states.RUNNING)
423
424
        # Update workflow execution and related liveaction and action execution.
425
        update_execution_records(wf_ex_db, conductor)
426
427
        # Request task execution for the tasks.
428
        for task in next_tasks:
429
            try:
430
                task_id, task_spec, task_ctx = task['id'], task['spec'], task['ctx']
431
                st2_ctx = {'execution_id': wf_ex_db.action_execution}
432
                request_task_execution(wf_ex_db, task_id, task_spec, task_ctx, st2_ctx)
433
            except Exception as e:
434
                fail_workflow_execution(str(wf_ex_db.id), e, task_id=task['id'])
435
                return
436
437
        # Identify the next set of tasks to execute.
438
        conductor, wf_ex_db = refresh_conductor(str(wf_ex_db.id))
439
        next_tasks = conductor.get_next_tasks()
440
441
442
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
443
def update_task_execution(task_ex_id, ac_ex_status, ac_ex_result=None):
444
    if ac_ex_status not in states.COMPLETED_STATES + [states.PAUSED]:
445
        return
446
447
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
448
    task_ex_db.status = ac_ex_status
449
    task_ex_db.result = ac_ex_result if ac_ex_result else task_ex_db.result
450
451
    if ac_ex_status in states.COMPLETED_STATES:
452
        task_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
453
454
    wf_db_access.TaskExecution.update(task_ex_db, publish=False)
455
456
457
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
458
def resume_task_execution(task_ex_id):
459
    # Update task execution to running.
460
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
461
    task_ex_db.status = states.RUNNING
462
463
    # Write update to the database.
464
    wf_db_access.TaskExecution.update(task_ex_db, publish=False)
465
466
467
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
468
def update_workflow_execution(wf_ex_id):
469
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
470
471
    # There is nothing to update if workflow execution is not completed or paused.
472
    if conductor.get_workflow_state() in states.COMPLETED_STATES + [states.PAUSED]:
473
        # Update workflow execution and related liveaction and action execution.
474
        update_execution_records(wf_ex_db, conductor)
475
476
477
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
478
def resume_workflow_execution(wf_ex_id, task_ex_id):
479
    # Update workflow execution to running.
480
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
481
    conductor.set_workflow_state(states.RUNNING)
482
483
    # Update task execution in task flow to running.
484
    task_ex_db = wf_db_access.TaskExecution.get_by_id(task_ex_id)
485
    conductor.update_task_flow(task_ex_db.task_id, states.RUNNING)
486
487
    # Update workflow execution and related liveaction and action execution.
488
    update_execution_records(wf_ex_db, conductor)
489
490
491
@retrying.retry(retry_on_exception=wf_exc.retry_on_exceptions)
492
def fail_workflow_execution(wf_ex_id, exception, task_id=None):
493
    conductor, wf_ex_db = refresh_conductor(wf_ex_id)
494
495
    # Set workflow execution status to failed and record error.
496
    conductor.set_workflow_state(states.FAILED)
497
    conductor.log_error(str(exception), task_id=task_id)
498
499
    # Update workflow execution and related liveaction and action execution.
500
    update_execution_records(wf_ex_db, conductor)
501
502
503
def update_execution_records(wf_ex_db, conductor, update_lv_ac_on_states=None,
504
                             pub_wf_ex=False, pub_lv_ac=True, pub_ac_ex=True):
505
506
    # Update timestamp and output if workflow is completed.
507
    if conductor.get_workflow_state() in states.COMPLETED_STATES:
508
        wf_ex_db.end_timestamp = date_utils.get_datetime_utc_now()
509
        wf_ex_db.output = conductor.get_workflow_output()
510
511
    # Update workflow status and task flow and write changes to database.
512
    wf_ex_db.status = conductor.get_workflow_state()
513
    wf_ex_db.errors = copy.deepcopy(conductor.errors)
514
    wf_ex_db.flow = conductor.flow.serialize()
515
    wf_ex_db = wf_db_access.WorkflowExecution.update(wf_ex_db, publish=pub_wf_ex)
516
517
    # Return if workflow execution status is not specified in update_lv_ac_on_states.
518
    if isinstance(update_lv_ac_on_states, list) and wf_ex_db.status not in update_lv_ac_on_states:
519
        return
520
521
    # Update the corresponding liveaction and action execution for the workflow.
522
    wf_ac_ex_db = ex_db_access.ActionExecution.get_by_id(wf_ex_db.action_execution)
523
    wf_lv_ac_db = ac_db_util.get_liveaction_by_id(wf_ac_ex_db.liveaction['id'])
524
525
    # Gather result for liveaction and action execution.
526
    result = {'output': wf_ex_db.output or None}
527
528
    if wf_ex_db.status in states.ABENDED_STATES:
529
        result['errors'] = wf_ex_db.errors
530
531
    # Sync update with corresponding liveaction and action execution.
532
    wf_lv_ac_db = ac_db_util.update_liveaction_status(
533
        status=wf_ex_db.status,
534
        result=result,
535
        end_timestamp=wf_ex_db.end_timestamp,
536
        liveaction_db=wf_lv_ac_db,
537
        publish=pub_lv_ac)
538
539
    ex_svc.update_execution(wf_lv_ac_db, publish=pub_ac_ex)
540