Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2stream/tests/unit/controllers/v1/test_stream.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import mock
17
18
from oslo_config import cfg
19
20
from st2common.models.api.action import ActionAPI
21
from st2common.models.api.action import RunnerTypeAPI
22
from st2common.models.api.execution import ActionExecutionAPI
23
from st2common.models.api.execution import LiveActionAPI
24
from st2common.models.api.execution import ActionExecutionOutputAPI
25
from st2common.models.db.liveaction import LiveActionDB
26
from st2common.models.db.execution import ActionExecutionDB
27
from st2common.models.db.execution import ActionExecutionOutputDB
28
from st2common.persistence.action import Action, RunnerType
29
import st2common.stream.listener
30
from st2stream.controllers.v1 import stream
31
from st2tests.api import SUPER_SECRET_PARAMETER
32
33
from .base import FunctionalTest
34
35
36
RUNNER_TYPE_1 = {
37
    'description': '',
38
    'enabled': True,
39
    'name': 'local-shell-cmd',
40
    'runner_module': 'local_runner',
41
    'runner_parameters': {}
42
}
43
44
ACTION_1 = {
45
    'name': 'st2.dummy.action1',
46
    'description': 'test description',
47
    'enabled': True,
48
    'entry_point': '/tmp/test/action1.sh',
49
    'pack': 'sixpack',
50
    'runner_type': 'local-shell-cmd',
51
    'parameters': {
52
        'a': {
53
            'type': 'string',
54
            'default': 'abc'
55
        },
56
        'b': {
57
            'type': 'number',
58
            'default': 123
59
        },
60
        'c': {
61
            'type': 'number',
62
            'default': 123,
63
            'immutable': True
64
        },
65
        'd': {
66
            'type': 'string',
67
            'secret': True
68
        }
69
    }
70
}
71
72
LIVE_ACTION_1 = {
73
    'action': 'sixpack.st2.dummy.action1',
74
    'parameters': {
75
        'hosts': 'localhost',
76
        'cmd': 'uname -a',
77
        'd': SUPER_SECRET_PARAMETER
78
    }
79
}
80
81
EXECUTION_1 = {
82
    'id': '598dbf0c0640fd54bffc688b',
83
    'action': {
84
        'ref': 'sixpack.st2.dummy.action1'
85
    },
86
    'parameters': {
87
        'hosts': 'localhost',
88
        'cmd': 'uname -a',
89
        'd': SUPER_SECRET_PARAMETER
90
    }
91
}
92
93
STDOUT_1 = {
94
    'execution_id': '598dbf0c0640fd54bffc688b',
95
    'action_ref': 'dummy.action1',
96
    'output_type': 'stdout'
97
}
98
99
STDERR_1 = {
100
    'execution_id': '598dbf0c0640fd54bffc688b',
101
    'action_ref': 'dummy.action1',
102
    'output_type': 'stderr'
103
}
104
105
106
class META(object):
107
    delivery_info = {}
108
109
    def __init__(self, exchange='some', routing_key='thing'):
110
        self.delivery_info['exchange'] = exchange
111
        self.delivery_info['routing_key'] = routing_key
112
113
    def ack(self):
114
        pass
115
116
117
class TestStreamController(FunctionalTest):
118
119
    @classmethod
120
    def setUpClass(cls):
121
        super(TestStreamController, cls).setUpClass()
122
123
        instance = RunnerTypeAPI(**RUNNER_TYPE_1)
124
        RunnerType.add_or_update(RunnerTypeAPI.to_model(instance))
125
126
        instance = ActionAPI(**ACTION_1)
127
        Action.add_or_update(ActionAPI.to_model(instance))
128
129
    @mock.patch.object(st2common.stream.listener, 'listen', mock.Mock())
130
    @mock.patch('st2stream.controllers.v1.stream.DEFAULT_EVENTS_WHITELIST', None)
131
    def test_get_all(self):
132
        resp = stream.StreamController().get_all()
133
        self.assertEqual(resp._status, '200 OK')
134
        self.assertIn(('Content-Type', 'text/event-stream; charset=UTF-8'), resp._headerlist)
135
136
        listener = st2common.stream.listener.get_listener(name='stream')
137
        process = listener.processor(LiveActionAPI)
138
139
        message = None
140
141
        for message in resp._app_iter:
142
            message = message.decode('utf-8')
143
            if message != '\n':
144
                break
145
            process(LiveActionDB(**LIVE_ACTION_1), META())
146
147
        self.assertIn('event: some__thing', message)
148
        self.assertIn('data: {"', message)
149
        self.assertNotIn(SUPER_SECRET_PARAMETER, message)
150
151
    @mock.patch.object(st2common.stream.listener, 'listen', mock.Mock())
152
    def test_get_all_with_filters(self):
153
        cfg.CONF.set_override(name='heartbeat', group='stream', override=0.1)
154
155
        listener = st2common.stream.listener.get_listener(name='stream')
156
        process_execution = listener.processor(ActionExecutionAPI)
157
        process_liveaction = listener.processor(LiveActionAPI)
158
        process_output = listener.processor(ActionExecutionOutputAPI)
159
        process_no_api_model = listener.processor()
160
161
        execution_api = ActionExecutionDB(**EXECUTION_1)
162
        liveaction_api = LiveActionDB(**LIVE_ACTION_1)
163
        output_api_stdout = ActionExecutionOutputDB(**STDOUT_1)
164
        output_api_stderr = ActionExecutionOutputDB(**STDERR_1)
165
166
        def dispatch_and_handle_mock_data(resp):
167
            received_messages_data = ''
168
            for index, message in enumerate(resp._app_iter):
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _app_iter was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
169
                if message.strip():
170
                    received_messages_data += message.decode('utf-8')
171
172
                # Dispatch some mock events
173
                if index == 0:
174
                    meta = META('st2.execution', 'create')
175
                    process_execution(execution_api, meta)
176
                elif index == 1:
177
                    meta = META('st2.execution', 'update')
178
                    process_execution(execution_api, meta)
179
                elif index == 2:
180
                    meta = META('st2.execution', 'delete')
181
                    process_execution(execution_api, meta)
182
                elif index == 3:
183
                    meta = META('st2.liveaction', 'create')
184
                    process_liveaction(liveaction_api, meta)
185
                elif index == 4:
186
                    meta = META('st2.liveaction', 'create')
187
                    process_liveaction(liveaction_api, meta)
188
                elif index == 5:
189
                    meta = META('st2.liveaction', 'delete')
190
                    process_liveaction(liveaction_api, meta)
191
                elif index == 6:
192
                    meta = META('st2.liveaction', 'delete')
193
                    process_liveaction(liveaction_api, meta)
194
                elif index == 7:
195
                    meta = META('st2.announcement', 'chatops')
196
                    process_no_api_model({}, meta)
197
                elif index == 8:
198
                    meta = META('st2.execution.output', 'create')
199
                    process_output(output_api_stdout, meta)
200
                elif index == 9:
201
                    meta = META('st2.execution.output', 'create')
202
                    process_output(output_api_stderr, meta)
203
                elif index == 10:
204
                    meta = META('st2.announcement', 'errbot')
205
                    process_no_api_model({}, meta)
206
207
                else:
208
                    break
209
210
            received_messages = received_messages_data.split('\n\n')
211
            received_messages = [message for message in received_messages if message]
212
            return received_messages
213
214
        # 1. Default filter - stdout and stderr messages should be excluded for backward
215
        # compatibility reasons
216
        resp = stream.StreamController().get_all()
217
218
        received_messages = dispatch_and_handle_mock_data(resp)
219
        self.assertEqual(len(received_messages), 9)
220
        self.assertTrue('st2.execution__create' in received_messages[0])
221
        self.assertTrue('st2.liveaction__delete' in received_messages[5])
222
        self.assertTrue('st2.announcement__chatops' in received_messages[7])
223
        self.assertTrue('st2.announcement__errbot' in received_messages[8])
224
225
        # 1. ?events= filter
226
        # No filter provided - all messages should be received
227
        stream.DEFAULT_EVENTS_WHITELIST = None
228
        resp = stream.StreamController().get_all()
229
230
        received_messages = dispatch_and_handle_mock_data(resp)
231
        self.assertEqual(len(received_messages), 11)
232
        self.assertTrue('st2.execution__create' in received_messages[0])
233
        self.assertTrue('st2.announcement__chatops' in received_messages[7])
234
        self.assertTrue('st2.execution.output__create' in received_messages[8])
235
        self.assertTrue('st2.execution.output__create' in received_messages[9])
236
        self.assertTrue('st2.announcement__errbot' in received_messages[10])
237
238
        # Filter provided, only three messages should be received
239
        events = ['st2.execution__create', 'st2.liveaction__delete']
240
        resp = stream.StreamController().get_all(events=events)
241
242
        received_messages = dispatch_and_handle_mock_data(resp)
243
        self.assertEqual(len(received_messages), 3)
244
        self.assertTrue('st2.execution__create' in received_messages[0])
245
        self.assertTrue('st2.liveaction__delete' in received_messages[1])
246
        self.assertTrue('st2.liveaction__delete' in received_messages[2])
247
248
        # Filter provided, only three messages should be received
249
        events = ['st2.liveaction__create', 'st2.liveaction__delete']
250
        resp = stream.StreamController().get_all(events=events)
251
252
        received_messages = dispatch_and_handle_mock_data(resp)
253
        self.assertEqual(len(received_messages), 4)
254
        self.assertTrue('st2.liveaction__create' in received_messages[0])
255
        self.assertTrue('st2.liveaction__create' in received_messages[1])
256
        self.assertTrue('st2.liveaction__delete' in received_messages[2])
257
        self.assertTrue('st2.liveaction__delete' in received_messages[3])
258
259
        # Glob filter
260
        events = ['st2.announcement__*']
261
        resp = stream.StreamController().get_all(events=events)
262
263
        received_messages = dispatch_and_handle_mock_data(resp)
264
        self.assertEqual(len(received_messages), 2)
265
        self.assertTrue('st2.announcement__chatops' in received_messages[0])
266
        self.assertTrue('st2.announcement__errbot' in received_messages[1])
267
268
        # Filter provided
269
        events = ['st2.execution.output__create']
270
        resp = stream.StreamController().get_all(events=events)
271
272
        received_messages = dispatch_and_handle_mock_data(resp)
273
        self.assertEqual(len(received_messages), 2)
274
        self.assertTrue('st2.execution.output__create' in received_messages[0])
275
        self.assertTrue('st2.execution.output__create' in received_messages[1])
276
277
        # Filter provided, invalid , no message should be received
278
        events = ['invalid1', 'invalid2']
279
        resp = stream.StreamController().get_all(events=events)
280
281
        received_messages = dispatch_and_handle_mock_data(resp)
282
        self.assertEqual(len(received_messages), 0)
283
284
        # 2. ?action_refs= filter
285
        action_refs = ['invalid1', 'invalid2']
286
        resp = stream.StreamController().get_all(action_refs=action_refs)
287
288
        received_messages = dispatch_and_handle_mock_data(resp)
289
        self.assertEqual(len(received_messages), 0)
290
291
        action_refs = ['dummy.action1']
292
        resp = stream.StreamController().get_all(action_refs=action_refs)
293
294
        received_messages = dispatch_and_handle_mock_data(resp)
295
        self.assertEqual(len(received_messages), 2)
296
297
        # 3. ?execution_ids= filter
298
        execution_ids = ['invalid1', 'invalid2']
299
        resp = stream.StreamController().get_all(execution_ids=execution_ids)
300
301
        received_messages = dispatch_and_handle_mock_data(resp)
302
        self.assertEqual(len(received_messages), 0)
303
304
        execution_ids = [EXECUTION_1['id']]
305
        resp = stream.StreamController().get_all(execution_ids=execution_ids)
306
307
        received_messages = dispatch_and_handle_mock_data(resp)
308
        self.assertEqual(len(received_messages), 5)
309