Test Setup Failed
Pull Request — master (#4154)
by W
03:25
created

WorkflowControlTestCaseMixin   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 13
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 13
rs 10
wmc 5

2 Methods

Rating   Name   Duplication   Size   Complexity  
A _delete_temp_file() 0 6 4
A _create_temp_file() 0 4 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import os
19
import retrying
20
import shutil
21
import six
22
import tempfile
23
import unittest2
24
25
from st2client import client as st2
26
from st2client import models
27
from st2common.constants import action as action_constants
28
29
30
LIVEACTION_LAUNCHED_STATUSES = [
31
    action_constants.LIVEACTION_STATUS_REQUESTED,
32
    action_constants.LIVEACTION_STATUS_SCHEDULED,
33
    action_constants.LIVEACTION_STATUS_RUNNING
34
]
35
36
37
def retry_on_exceptions(exc):
38
    return isinstance(exc, AssertionError)
39
40
41
class WorkflowControlTestCaseMixin(object):
42
43
    def _create_temp_file(self):
44
        _, temp_file_path = tempfile.mkstemp()
45
        os.chmod(temp_file_path, 0o755)     # nosec
46
        return temp_file_path
47
48
    def _delete_temp_file(self, temp_file_path):
49
        if temp_file_path and os.path.exists(temp_file_path):
50
            if os.path.isdir(temp_file_path):
51
                shutil.rmtree(temp_file_path)
52
            else:
53
                os.remove(temp_file_path)
54
55
56
class TestWorkflowExecution(unittest2.TestCase):
57
58
    @classmethod
59
    def setUpClass(cls):
60
        cls.st2client = st2.Client(base_url='http://127.0.0.1')
61
62
    def _execute_workflow(self, action, parameters=None):
63
        ex = models.LiveAction(action=action, parameters=(parameters or {}))
64
        ex = self.st2client.liveactions.create(ex)
65
        self.assertIsNotNone(ex.id)
66
        self.assertEqual(ex.action['ref'], action)
67
        self.assertIn(ex.status, LIVEACTION_LAUNCHED_STATUSES)
68
69
        return ex
70
71
    @retrying.retry(
72
        retry_on_exception=retry_on_exceptions,
73
        wait_fixed=3000, stop_max_delay=900000)
74
    def _wait_for_state(self, ex, states):
75
        if isinstance(states, six.string_types):
76
            states = [states]
77
78
        for state in states:
79
            if state not in action_constants.LIVEACTION_STATUSES:
80
                raise ValueError('Status %s is not valid.' % state)
81
82
        try:
83
            ex = self.st2client.liveactions.get_by_id(ex.id)
84
            self.assertIn(ex.status, states)
85
        except:
86
            if ex.status in action_constants.LIVEACTION_COMPLETED_STATES:
87
                raise Exception(
88
                    'Execution is in completed state and does not '
89
                    'match expected state(s).'
90
                )
91
            else:
92
                raise
93
94
        return ex
95
96
    def _get_children(self, ex):
97
        return self.st2client.liveactions.query(parent=ex.id)
98
99
    @retrying.retry(
100
        retry_on_exception=retry_on_exceptions,
101
        wait_fixed=3000, stop_max_delay=900000)
102
    def _wait_for_task(self, ex, task, status, num_task_exs=1):
103
        ex = self.st2client.liveactions.get_by_id(ex.id)
104
105
        task_exs = [
106
            task_ex for task_ex in self._get_children(ex)
107
            if (task_ex.context.get('orchestra', {}).get('task_name', '') == task and
108
                task_ex.status == status)
109
        ]
110
111
        try:
112
            self.assertEqual(len(task_exs), num_task_exs)
113
            self.assertTrue(all([task_ex.status == status for task_ex in task_exs]))
114
        except:
115
            if ex.status in action_constants.LIVEACTION_COMPLETED_STATES:
116
                raise Exception(
117
                    'Execution is in completed state and does not '
118
                    'match expected task.'
119
                )
120
            else:
121
                raise
122
123
        return task_exs
124
125
    @retrying.retry(
126
        retry_on_exception=retry_on_exceptions,
127
        wait_fixed=3000, stop_max_delay=900000)
128
    def _wait_for_completion(self, ex):
129
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_COMPLETED_STATES)
130
131
        try:
132
            self.assertTrue(hasattr(ex, 'result'))
133
        except:
134
            if ex.status in action_constants.LIVEACTION_COMPLETED_STATES:
135
                raise Exception(
136
                    'Execution is in completed state and does not '
137
                    'contain expected result.'
138
                )
139
            else:
140
                raise
141
142
        return ex
143