Passed
Pull Request — master (#3507)
by W
04:31
created

MistralRunnerPauseResumeTest.test_pause()   B

Complexity

Conditions 1

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 33
rs 8.8571
c 1
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import mock
20
import yaml
21
22
from mistralclient.api.v2 import executions
23
from mistralclient.api.v2 import workflows
24
from oslo_config import cfg
25
26
# XXX: actionsensor import depends on config being setup.
27
import st2tests.config as tests_config
28
tests_config.parse_args()
29
30
from mistral_v2 import MistralRunner
31
from st2common.bootstrap import actionsregistrar
32
from st2common.bootstrap import runnersregistrar
33
from st2common.constants import action as action_constants
34
from st2common.models.db.liveaction import LiveActionDB
35
from st2common.persistence.liveaction import LiveAction
36
from st2common.runners import base as runners
37
from st2common.services import action as action_service
38
from st2common.transport.liveaction import LiveActionPublisher
39
from st2common.transport.publishers import CUDPublisher
40
from st2common.util import loader
41
from st2tests import DbTestCase
42
from st2tests import fixturesloader
43
from st2tests.mocks.liveaction import MockLiveActionPublisher
44
45
46
TEST_PACK = 'mistral_tests'
47
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
48
49
PACKS = [
50
    TEST_PACK_PATH,
51
    fixturesloader.get_fixtures_packs_base_path() + '/core'
52
]
53
54
# Action executions requirements
55
ACTION_PARAMS = {'friend': 'Rocky'}
56
NON_EMPTY_RESULT = 'non-empty'
57
58
# Non-workbook with a single workflow
59
WF1_META_FILE_NAME = 'workflow_v2.yaml'
60
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
61
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
62
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
63
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
64
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
65
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
66
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
67
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
68
WF1_OLD = workflows.Workflow(None, {'name': WF1_NAME, 'definition': ''})
69
WF1_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
70
WF1_EXEC_PAUSED = copy.deepcopy(WF1_EXEC)
71
WF1_EXEC_PAUSED['state'] = 'PAUSED'
72
73
74
@mock.patch.object(
75
    CUDPublisher,
76
    'publish_update',
77
    mock.MagicMock(return_value=None))
78
@mock.patch.object(
79
    CUDPublisher,
80
    'publish_create',
81
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
82
@mock.patch.object(
83
    LiveActionPublisher,
84
    'publish_state',
85
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
86
class MistralRunnerPauseResumeTest(DbTestCase):
87
88
    @classmethod
89
    def setUpClass(cls):
90
        super(MistralRunnerPauseResumeTest, cls).setUpClass()
91
92
        # Override the retry configuration here otherwise st2tests.config.parse_args
93
        # in DbTestCase.setUpClass will reset these overrides.
94
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
95
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
96
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
97
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
98
99
        # Register runners.
100
        runnersregistrar.register_runners()
101
102
        # Register test pack(s).
103
        actions_registrar = actionsregistrar.ActionsRegistrar(
104
            use_pack_cache=False,
105
            fail_on_failure=True
106
        )
107
108
        for pack in PACKS:
109
            actions_registrar.register_from_pack(pack)
110
111
    @classmethod
112
    def get_runner_class(cls, runner_name):
113
        return runners.get_runner(runner_name).__class__
114
115
    @mock.patch.object(
116
        workflows.WorkflowManager, 'list',
117
        mock.MagicMock(return_value=[]))
118
    @mock.patch.object(
119
        workflows.WorkflowManager, 'get',
120
        mock.MagicMock(return_value=WF1))
121
    @mock.patch.object(
122
        workflows.WorkflowManager, 'create',
123
        mock.MagicMock(return_value=[WF1]))
124
    @mock.patch.object(
125
        executions.ExecutionManager, 'create',
126
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
127
    @mock.patch.object(
128
        executions.ExecutionManager, 'update',
129
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC_PAUSED)))
130
    def test_pause(self):
131
        # Launch the workflow execution.
132
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
133
        liveaction, execution = action_service.request(liveaction)
134
        liveaction = LiveAction.get_by_id(str(liveaction.id))
135
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
136
137
        mistral_context = liveaction.context.get('mistral', None)
138
        self.assertIsNotNone(mistral_context)
139
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
140
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
141
142
        # Pause the workflow execution.
143
        requester = cfg.CONF.system_user.user
144
        liveaction, execution = action_service.request_pause(liveaction, requester)
145
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'PAUSED')
146
        liveaction = LiveAction.get_by_id(str(liveaction.id))
147
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
148
149
    @mock.patch.object(
150
        workflows.WorkflowManager, 'list',
151
        mock.MagicMock(return_value=[]))
152
    @mock.patch.object(
153
        workflows.WorkflowManager, 'get',
154
        mock.MagicMock(return_value=WF1))
155
    @mock.patch.object(
156
        workflows.WorkflowManager, 'create',
157
        mock.MagicMock(return_value=[WF1]))
158
    @mock.patch.object(
159
        executions.ExecutionManager, 'create',
160
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
161
    @mock.patch.object(
162
        executions.ExecutionManager, 'update',
163
        mock.MagicMock(side_effect=[
164
            executions.Execution(None, WF1_EXEC_PAUSED),
165
            executions.Execution(None, WF1_EXEC)]))
166
    def test_resume(self):
167
        # Launch the workflow execution.
168
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
169
        liveaction, execution = action_service.request(liveaction)
170
        liveaction = LiveAction.get_by_id(str(liveaction.id))
171
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
172
173
        mistral_context = liveaction.context.get('mistral', None)
174
        self.assertIsNotNone(mistral_context)
175
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
176
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
177
178
        # Pause the workflow execution.
179
        requester = cfg.CONF.system_user.user
180
        liveaction, execution = action_service.request_pause(liveaction, requester)
181
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'PAUSED')
182
        liveaction = LiveAction.get_by_id(str(liveaction.id))
183
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
184
185
        # Manually update the liveaction from pausing to paused. The paused state
186
        # is usually updated by the mistral querier.
187
        action_service.update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
188
        liveaction = LiveAction.get_by_id(str(liveaction.id))
189
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
190
191
        # Resume the workflow execution.
192
        liveaction, execution = action_service.request_resume(liveaction, requester)
193
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'RUNNING')
194
        liveaction = LiveAction.get_by_id(str(liveaction.id))
195
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
196