Passed
Pull Request — master (#3627)
by W
05:57
created

TestWorkflowExecution._wait_for_completion()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 14
rs 8.5454
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import retrying
17
import unittest2
18
19
from st2client import client as st2
20
from st2client import models
21
22
23
class TestWorkflowExecution(unittest2.TestCase):
24
25
    @classmethod
26
    def setUpClass(cls):
27
        cls.st2client = st2.Client(base_url='http://127.0.0.1')
28
29
    def _execute_workflow(self, action, parameters=None):
30
        if parameters is None:
31
            parameters = {}
32
33
        execution = models.LiveAction(action=action, parameters=parameters)
34
        execution = self.st2client.liveactions.create(execution)
35
        self.assertIsNotNone(execution.id)
36
        self.assertEqual(execution.action['ref'], action)
37
        self.assertIn(execution.status, ['requested', 'scheduled', 'running'])
38
39
        return execution
40
41
    @retrying.retry(wait_fixed=3000, stop_max_delay=900000)
42
    def _wait_for_state(self, execution, states):
43
        execution = self.st2client.liveactions.get_by_id(execution.id)
44
        self.assertIn(execution.status, states)
45
        return execution
46
47
    @retrying.retry(wait_fixed=3000, stop_max_delay=900000)
48
    def _wait_for_completion(self, execution, expect_tasks=True, expect_tasks_completed=True):
49
        execution = self._wait_for_state(execution, ['succeeded', 'failed', 'canceled'])
50
        self.assertTrue(hasattr(execution, 'result'))
51
52
        if expect_tasks:
53
            self.assertIn('tasks', execution.result)
54
            self.assertGreater(len(execution.result['tasks']), 0)
55
56
        if expect_tasks and expect_tasks_completed:
57
            tasks = execution.result['tasks']
58
            self.assertTrue(all([t['state'] in ['SUCCESS', 'ERROR', 'CANCELLED'] for t in tasks]))
59
60
        return execution
61
62
    def _assert_success(self, execution, num_tasks=0):
63
        self.assertEqual(execution.status, 'succeeded')
64
        tasks = execution.result.get('tasks', [])
65
        self.assertEqual(num_tasks, len(tasks))
66
        self.assertTrue(all([task['state'] == 'SUCCESS' for task in tasks]))
67
68
    def _assert_failure(self, execution, expect_tasks_failure=True):
69
        self.assertEqual(execution.status, 'failed')
70
        tasks = execution.result.get('tasks', [])
71
72
        if expect_tasks_failure:
73
            self.assertTrue(any([task['state'] == 'ERROR' for task in tasks]))
74
75
    def _assert_canceled(self, execution, are_tasks_completed=False):
76
        self.assertEqual(execution.status, 'canceled')
77
        tasks = execution.result.get('tasks', [])
78
79
        if are_tasks_completed:
80
            self.assertTrue(all([t['state'] in ['SUCCESS', 'ERROR', 'CANCELLED'] for t in tasks]))
81