Completed
Push — master ( d1f0a7...3b2ece )
by Edward
21:04 queued 05:38
created

integration.mistral.TestWorkflowExecution   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 56
Duplicated Lines 0 %
Metric Value
wmc 15
dl 0
loc 56
rs 10

7 Methods

Rating   Name   Duplication   Size   Complexity  
A setUpClass() 0 3 1
A _execute_workflow() 0 11 2
A _wait_for_state() 0 5 1
A _assert_success() 0 5 2
A _assert_canceled() 0 8 4
A _assert_failure() 0 4 2
A _wait_for_completion() 0 12 3
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import retrying
17
import unittest2
18
19
from st2client import client as st2
20
from st2client import models
21
22
23
class TestWorkflowExecution(unittest2.TestCase):
24
25
    @classmethod
26
    def setUpClass(cls):
27
        cls.st2client = st2.Client(base_url='http://localhost')
28
29
    def _execute_workflow(self, action, parameters=None):
30
        if parameters is None:
31
            parameters = {}
32
33
        execution = models.LiveAction(action=action, parameters=parameters)
34
        execution = self.st2client.liveactions.create(execution)
35
        self.assertIsNotNone(execution.id)
36
        self.assertEqual(execution.action['ref'], action)
37
        self.assertIn(execution.status, ['requested', 'scheduled', 'running'])
38
39
        return execution
40
41
    @retrying.retry(wait_fixed=3000, stop_max_delay=900000)
42
    def _wait_for_state(self, execution, states):
43
        execution = self.st2client.liveactions.get_by_id(execution.id)
44
        self.assertIn(execution.status, states)
45
        return execution
46
47
    @retrying.retry(wait_fixed=3000, stop_max_delay=900000)
48
    def _wait_for_completion(self, execution, expect_tasks_completed=True):
49
        execution = self._wait_for_state(execution, ['succeeded', 'failed', 'canceled'])
50
        self.assertTrue(hasattr(execution, 'result'))
51
        self.assertIn('tasks', execution.result)
52
        self.assertGreater(len(execution.result['tasks']), 0)
53
54
        if expect_tasks_completed:
55
            tasks = execution.result['tasks']
56
            self.assertTrue(all([t['state'] in ['SUCCESS', 'ERROR'] for t in tasks]))
57
58
        return execution
59
60
    def _assert_success(self, execution, num_tasks=0):
61
        self.assertEqual(execution.status, 'succeeded')
62
        tasks = execution.result.get('tasks', [])
63
        self.assertEqual(num_tasks, len(tasks))
64
        self.assertTrue(all([task['state'] == 'SUCCESS' for task in tasks]))
65
66
    def _assert_failure(self, execution):
67
        self.assertEqual(execution.status, 'failed')
68
        tasks = execution.result.get('tasks', [])
69
        self.assertTrue(any([task['state'] == 'ERROR' for task in tasks]))
70
71
    def _assert_canceled(self, execution, are_tasks_completed=False):
72
        self.assertEqual(execution.status, 'canceled')
73
        tasks = execution.result.get('tasks', [])
74
75
        if are_tasks_completed:
76
            self.assertTrue(all([t['state'] in ['SUCCESS', 'ERROR'] for t in tasks]))
77
        else:
78
            self.assertTrue(any([t['state'] == 'RUNNING' for t in tasks]))
79