Issues (8)

st2tests/st2tests/api.py (1 issue)

possibly unsafe usage of loop variables

Bug Major
1
# Copyright 2019 Extreme Networks, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
Various base classes and test utility functions for API related tests.
17
"""
18
19
from __future__ import absolute_import
20
21
import json
22
23
import six
24
import webtest
25
import mock
26
27
from oslo_config import cfg
28
29
from st2common.router import Router
30
from st2common.bootstrap import runnersregistrar as runners_registrar
31
from st2tests.base import DbTestCase
32
from st2tests.base import CleanDbTestCase
33
from st2tests import config as tests_config
34
35
__all__ = [
36
    'BaseFunctionalTest',
37
38
    'FunctionalTest',
39
    'APIControllerWithIncludeAndExcludeFilterTestCase',
40
    'BaseInquiryControllerTestCase',
41
42
    'FakeResponse',
43
    'TestApp'
44
]
45
46
47
SUPER_SECRET_PARAMETER = 'SUPER_SECRET_PARAMETER_THAT_SHOULD_NEVER_APPEAR_IN_RESPONSES_OR_LOGS'
48
ANOTHER_SUPER_SECRET_PARAMETER = 'ANOTHER_SUPER_SECRET_PARAMETER_TO_TEST_OVERRIDING'
49
50
51
class ResponseValidationError(ValueError):
52
    pass
53
54
55
class ResponseLeakError(ValueError):
56
    pass
57
58
59
class TestApp(webtest.TestApp):
60
    def do_request(self, req, **kwargs):
61
        self.cookiejar.clear()
62
63
        if req.environ['REQUEST_METHOD'] != 'OPTIONS':
64
            # Making sure endpoint handles OPTIONS method properly
65
            self.options(req.environ['PATH_INFO'])
66
67
        res = super(TestApp, self).do_request(req, **kwargs)
68
69
        if res.headers.get('Warning', None):
70
            raise ResponseValidationError('Endpoint produced invalid response. Make sure the '
71
                                          'response matches OpenAPI scheme for the endpoint.')
72
73
        if not kwargs.get('expect_errors', None):
74
            try:
75
                body = res.body
76
            except AssertionError as e:
77
                if 'Iterator read after closed' in six.text_type(e):
78
                    body = b''
79
                else:
80
                    raise e
81
82
            if six.b(SUPER_SECRET_PARAMETER) in body or \
83
                    six.b(ANOTHER_SUPER_SECRET_PARAMETER) in body:
84
                raise ResponseLeakError('Endpoint response contains secret parameter. '
85
                                        'Find the leak.')
86
87
        if 'Access-Control-Allow-Origin' not in res.headers:
88
            raise ResponseValidationError('Response missing a required CORS header')
89
90
        return res
91
92
93
class BaseFunctionalTest(DbTestCase):
94
    """
95
    Base test case class for testing API controllers with auth and RBAC disabled.
96
    """
97
98
    # App used by the tests
99
    app_module = None
100
101
    # By default auth is disabled
102
    enable_auth = False
103
104
    register_runners = True
105
106
    @classmethod
107
    def setUpClass(cls):
108
        super(BaseFunctionalTest, cls).setUpClass()
109
        cls._do_setUpClass()
110
111
    def tearDown(self):
112
        super(BaseFunctionalTest, self).tearDown()
113
114
        # Reset mock context for API requests
115
        if getattr(self, 'request_context_mock', None):
116
            self.request_context_mock.stop()
117
118
            if hasattr(Router, 'mock_context'):
119
                del(Router.mock_context)
120
121
    @classmethod
122
    def _do_setUpClass(cls):
123
        tests_config.parse_args()
124
125
        cfg.CONF.set_default('enable', cls.enable_auth, group='auth')
126
127
        cfg.CONF.set_override(name='enable', override=False, group='rbac')
128
129
        # TODO(manas) : register action types here for now. RunnerType registration can be moved
130
        # to posting to /runnertypes but that implies implementing POST.
131
        if cls.register_runners:
132
            runners_registrar.register_runners()
133
134
        cls.app = TestApp(cls.app_module.setup_app())
135
136
    def use_user(self, user_db):
137
        """
138
        Select a user which is to be used by the HTTP request following this call.
139
        """
140
        if not user_db:
141
            raise ValueError('"user_db" is mandatory')
142
143
        mock_context = {
144
            'user': user_db,
145
            'auth_info': {
146
                'method': 'authentication token',
147
                'location': 'header'
148
            }
149
        }
150
        self.request_context_mock = mock.PropertyMock(return_value=mock_context)
151
        Router.mock_context = self.request_context_mock
152
153
154
class FunctionalTest(BaseFunctionalTest):
155
    from st2api import app
156
157
    app_module = app
158
159
160
# pylint: disable=no-member
161
class APIControllerWithIncludeAndExcludeFilterTestCase(object):
162
    """
163
    Base class which is to be inherited from the API controller test cases which support
164
    ?exclude_filters and ?include_filters query param filters.
165
    """
166
167
    # Controller get all path (e.g. "/v1/actions", /v1/rules, etc)
168
    get_all_path = None
169
170
    # API controller class
171
    controller_cls = None
172
173
    # Name of the model field to filter on
174
    include_attribute_field_name = None
175
176
    # Name of the model field to filter on
177
    exclude_attribute_field_name = None
178
179
    # True to assert that the object count in the response matches count returned by
180
    # _get_model_instance method method
181
    test_exact_object_count = True
182
183
    def test_get_all_exclude_attributes_and_include_attributes_are_mutually_exclusive(self):
184
        url = self.get_all_path + '?include_attributes=id&exclude_attributes=id'
185
        resp = self.app.get(url, expect_errors=True)
186
        self.assertEqual(resp.status_int, 400)
187
        expected_msg = ('exclude.*? and include.*? arguments are mutually exclusive. '
188
                        'You need to provide either one or another, but not both.')
189
        self.assertRegexpMatches(resp.json['faultstring'], expected_msg)
190
191
    def test_get_all_invalid_exclude_and_include_parameter(self):
192
        # 1. Invalid exclude_attributes field
193
        url = self.get_all_path + '?exclude_attributes=invalid_field'
194
        resp = self.app.get(url, expect_errors=True)
195
196
        expected_msg = ('Invalid or unsupported exclude attribute specified: .*invalid_field.*')
197
        self.assertEqual(resp.status_int, 400)
198
        self.assertRegexpMatches(resp.json['faultstring'], expected_msg)
199
200
        # 2. Invalid include_attributes field
201
        url = self.get_all_path + '?include_attributes=invalid_field'
202
        resp = self.app.get(url, expect_errors=True)
203
204
        expected_msg = ('Invalid or unsupported include attribute specified: .*invalid_field.*')
205
        self.assertEqual(resp.status_int, 400)
206
        self.assertRegexpMatches(resp.json['faultstring'], expected_msg)
207
208
    def test_get_all_include_attributes_filter(self):
209
        mandatory_include_fields = self.controller_cls.mandatory_include_fields_response
210
211
        # Create any resources needed by those tests (if not already created inside setUp /
212
        # setUpClass)
213
        object_ids = self._insert_mock_models()
214
215
        # Valid include attribute  - mandatory field which should always be included
216
        resp = self.app.get('%s?include_attributes=%s' % (self.get_all_path,
217
                                                          mandatory_include_fields[0]))
218
219
        self.assertEqual(resp.status_int, 200)
220
        self.assertTrue(len(resp.json) >= 1)
221
222
        if self.test_exact_object_count:
223
            self.assertEqual(len(resp.json), len(object_ids))
224
225
        self.assertEqual(len(resp.json[0].keys()), len(mandatory_include_fields))
226
227
        # Verify all mandatory fields are include
228
        for field in mandatory_include_fields:
229
            self.assertResponseObjectContainsField(resp.json[0], field)
230
231
        # Valid include attribute - not a mandatory field
232
        include_field = self.include_attribute_field_name
233
        assert include_field not in mandatory_include_fields
234
235
        resp = self.app.get('%s?include_attributes=%s' % (self.get_all_path, include_field))
236
237
        self.assertEqual(resp.status_int, 200)
238
        self.assertTrue(len(resp.json) >= 1)
239
240
        if self.test_exact_object_count:
241
            self.assertEqual(len(resp.json), len(object_ids))
242
243
        self.assertEqual(len(resp.json[0].keys()), len(mandatory_include_fields) + 1)
244
245
        for field in [include_field] + mandatory_include_fields:
246
            self.assertResponseObjectContainsField(resp.json[0], field)
247
248
        # Delete mock resources
249
        self._delete_mock_models(object_ids)
250
251
    def test_get_all_exclude_attributes_filter(self):
252
        # Create any resources needed by those tests (if not already created inside setUp /
253
        # setUpClass)
254
        object_ids = self._insert_mock_models()
255
256
        # Valid exclude attribute
257
258
        # 1. Verify attribute is present when no filter is provided
259
        exclude_attribute = self.exclude_attribute_field_name
260
        resp = self.app.get(self.get_all_path)
261
262
        self.assertEqual(resp.status_int, 200)
263
        self.assertTrue(len(resp.json) >= 1)
264
265
        if self.test_exact_object_count:
266
            self.assertEqual(len(resp.json), len(object_ids))
267
268
        self.assertTrue(exclude_attribute in resp.json[0])
269
270
        # 2. Verify attribute is excluded when filter is provided
271
        exclude_attribute = self.exclude_attribute_field_name
272
        resp = self.app.get('%s?exclude_attributes=%s' % (self.get_all_path,
273
                                                          exclude_attribute))
274
275
        self.assertEqual(resp.status_int, 200)
276
        self.assertTrue(len(resp.json) >= 1)
277
278
        if self.test_exact_object_count:
279
            self.assertEqual(len(resp.json), len(object_ids))
280
281
        self.assertFalse(exclude_attribute in resp.json[0])
282
283
        self._delete_mock_models(object_ids)
284
285
    def assertResponseObjectContainsField(self, resp_item, field):
286
        # Handle "." and nested fields
287
        if '.' in field:
288
            split = field.split('.')
289
290
            for index, field_part in enumerate(split):
291
                self.assertTrue(field_part in resp_item)
292
                resp_item = resp_item[field_part]
293
294
            # Additional safety check
295
            self.assertEqual(index, len(split) - 1)
0 ignored issues
show
The loop variable index might not be defined here.
Loading history...
296
        else:
297
            self.assertTrue(field in resp_item)
298
299
    def _insert_mock_models(self):
300
        """
301
        Insert mock models used for get all filter tests.
302
303
        If the test class inserts mock models inside setUp / setUpClass method, this function
304
        should just return the ids of inserted models.
305
        """
306
        return []
307
308
    def _delete_mock_models(self, object_ids):
309
        """
310
        Delete mock models / objects used by get all filter tests.
311
312
        If the test class inserts mock models inside setUp / setUpClass method, this method should
313
        be overridden and made a no-op.
314
        """
315
        for object_id in object_ids:
316
            self._do_delete(object_id)
317
318
    def _do_delete(self, object_id):
319
        pass
320
321
322
class FakeResponse(object):
323
324
    def __init__(self, text, status_code, reason):
325
        self.text = text
326
        self.status_code = status_code
327
        self.reason = reason
328
329
    def json(self):
330
        return json.loads(self.text)
331
332
    def raise_for_status(self):
333
        raise Exception(self.reason)
334
335
336
class BaseActionExecutionControllerTestCase(object):
337
    app = None
338
339
    @staticmethod
340
    def _get_actionexecution_id(resp):
341
        return resp.json['id']
342
343
    @staticmethod
344
    def _get_liveaction_id(resp):
345
        return resp.json['liveaction']['id']
346
347
    def _do_get_one(self, actionexecution_id, *args, **kwargs):
348
        return self.app.get('/v1/executions/%s' % actionexecution_id, *args, **kwargs)
349
350
    def _do_post(self, liveaction, *args, **kwargs):
351
        return self.app.post_json('/v1/executions', liveaction, *args, **kwargs)
352
353
    def _do_delete(self, actionexecution_id, expect_errors=False):
354
        return self.app.delete('/v1/executions/%s' % actionexecution_id,
355
                               expect_errors=expect_errors)
356
357
    def _do_put(self, actionexecution_id, updates, *args, **kwargs):
358
        return self.app.put_json('/v1/executions/%s' % actionexecution_id, updates, *args, **kwargs)
359
360
361
class BaseInquiryControllerTestCase(BaseFunctionalTest, CleanDbTestCase):
362
    """
363
    Base class for non-RBAC tests for Inquiry API
364
365
    Inherits from CleanDbTestCase to preserve atomicity between tests
366
    """
367
    from st2api import app
368
369
    enable_auth = False
370
    app_module = app
371
372
    @staticmethod
373
    def _get_inquiry_id(resp):
374
        return resp.json['id']
375
376
    def _do_get_execution(self, actionexecution_id, *args, **kwargs):
377
        return self.app.get('/v1/executions/%s' % actionexecution_id, *args, **kwargs)
378
379
    def _do_get_one(self, inquiry_id, *args, **kwargs):
380
        return self.app.get('/v1/inquiries/%s' % inquiry_id, *args, **kwargs)
381
382
    def _do_get_all(self, limit=50, *args, **kwargs):
383
        return self.app.get('/v1/inquiries/?limit=%s' % limit, *args, **kwargs)
384
385
    def _do_respond(self, inquiry_id, response, *args, **kwargs):
386
        payload = {
387
            "id": inquiry_id,
388
            "response": response
389
        }
390
        return self.app.put_json('/v1/inquiries/%s' % inquiry_id, payload, *args, **kwargs)
391
392
    def _do_create_inquiry(self, liveaction, result, status='pending', *args, **kwargs):
393
        post_resp = self.app.post_json('/v1/executions', liveaction, *args, **kwargs)
394
        inquiry_id = self._get_inquiry_id(post_resp)
395
        updates = {'status': status, 'result': result}
396
        return self.app.put_json('/v1/executions/%s' % inquiry_id, updates, *args, **kwargs)
397