Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2tests/st2tests/api.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
Various base classes and test utility functions for API related tests.
18
"""
19
20
from __future__ import absolute_import
21
22
import six
23
import webtest
24
import mock
25
from oslo_config import cfg
26
27
from st2common.router import Router
28
from st2common.rbac.types import SystemRole
29
from st2common.persistence.auth import User
30
from st2common.persistence.rbac import UserRoleAssignment
31
from st2common.models.db.auth import UserDB
32
from st2common.models.db.rbac import UserRoleAssignmentDB
33
from st2common.rbac.migrations import run_all as run_all_rbac_migrations
34
from st2common.bootstrap import runnersregistrar as runners_registrar
35
from st2tests.base import DbTestCase
36
from st2tests.base import CleanDbTestCase
37
from st2tests import config as tests_config
38
39
__all__ = [
40
    'BaseFunctionalTest',
41
    'BaseAPIControllerWithRBACTestCase',
42
43
    'TestApp'
44
]
45
46
47
SUPER_SECRET_PARAMETER = 'SUPER_SECRET_PARAMETER_THAT_SHOULD_NEVER_APPEAR_IN_RESPONSES_OR_LOGS'
48
ANOTHER_SUPER_SECRET_PARAMETER = 'ANOTHER_SUPER_SECRET_PARAMETER_TO_TEST_OVERRIDING'
49
50
51
class ResponseValidationError(ValueError):
52
    pass
53
54
55
class ResponseLeakError(ValueError):
56
    pass
57
58
59
class TestApp(webtest.TestApp):
60
    def do_request(self, req, **kwargs):
61
        self.cookiejar.clear()
62
63
        if req.environ['REQUEST_METHOD'] != 'OPTIONS':
64
            # Making sure endpoint handles OPTIONS method properly
65
            self.options(req.environ['PATH_INFO'])
66
67
        res = super(TestApp, self).do_request(req, **kwargs)
68
69
        if res.headers.get('Warning', None):
70
            raise ResponseValidationError('Endpoint produced invalid response. Make sure the '
71
                                          'response matches OpenAPI scheme for the endpoint.')
72
73
        if not kwargs.get('expect_errors', None):
74
            try:
75
                body = res.body
76
            except AssertionError as e:
77
                if 'Iterator read after closed' in str(e):
78
                    body = b''
79
                else:
80
                    raise e
81
82
            if six.b(SUPER_SECRET_PARAMETER) in body or \
83
                    six.b(ANOTHER_SUPER_SECRET_PARAMETER) in body:
84
                raise ResponseLeakError('Endpoint response contains secret parameter. '
85
                                        'Find the leak.')
86
87
        if 'Access-Control-Allow-Origin' not in res.headers:
88
            raise ResponseValidationError('Response missing a required CORS header')
89
90
        return res
91
92
93
class BaseFunctionalTest(DbTestCase):
94
    """
95
    Base test case class for testing API controllers with auth and RBAC disabled.
96
    """
97
98
    # App used by the tests
99
    app_module = None
100
101
    # By default auth is disabled
102
    enable_auth = False
103
104
    register_runners = True
105
106
    @classmethod
107
    def setUpClass(cls):
108
        super(BaseFunctionalTest, cls).setUpClass()
109
        cls._do_setUpClass()
110
111
    @classmethod
112
    def _do_setUpClass(cls):
113
        tests_config.parse_args()
114
115
        cfg.CONF.set_default('enable', cls.enable_auth, group='auth')
116
117
        cfg.CONF.set_override(name='enable', override=False, group='rbac')
118
119
        # TODO(manas) : register action types here for now. RunnerType registration can be moved
120
        # to posting to /runnertypes but that implies implementing POST.
121
        if cls.register_runners:
122
            runners_registrar.register_runners()
123
124
        cls.app = TestApp(cls.app_module.setup_app())
125
126
127
class BaseAPIControllerWithRBACTestCase(BaseFunctionalTest, CleanDbTestCase):
128
    """
129
    Base test case class for testing API controllers with RBAC enabled.
130
    """
131
132
    enable_auth = True
133
134
    @classmethod
135
    def setUpClass(cls):
136
        super(BaseAPIControllerWithRBACTestCase, cls).setUpClass()
137
138
        # Make sure RBAC is enabeld
139
        cfg.CONF.set_override(name='enable', override=True, group='rbac')
140
141
    @classmethod
142
    def tearDownClass(cls):
143
        super(BaseAPIControllerWithRBACTestCase, cls).tearDownClass()
144
145
    def setUp(self):
146
        super(BaseAPIControllerWithRBACTestCase, self).setUp()
147
148
        self.users = {}
149
        self.roles = {}
150
151
        # Run RBAC migrations
152
        run_all_rbac_migrations()
153
154
        # Insert mock users with default role assignments
155
        role_names = [SystemRole.SYSTEM_ADMIN, SystemRole.ADMIN, SystemRole.OBSERVER]
156
        for role_name in role_names:
157
            user_db = UserDB(name=role_name)
158
            user_db = User.add_or_update(user_db)
159
            self.users[role_name] = user_db
160
161
            role_assignment_db = UserRoleAssignmentDB(
162
                user=user_db.name, role=role_name,
163
                source='assignments/%s.yaml' % user_db.name)
164
            UserRoleAssignment.add_or_update(role_assignment_db)
165
166
        # Insert a user with no permissions and role assignments
167
        user_1_db = UserDB(name='no_permissions')
168
        user_1_db = User.add_or_update(user_1_db)
169
        self.users['no_permissions'] = user_1_db
170
171
        # Insert special system user
172
        user_2_db = UserDB(name='system_user')
173
        user_2_db = User.add_or_update(user_2_db)
174
        self.users['system_user'] = user_2_db
175
176
        role_assignment_db = UserRoleAssignmentDB(
177
            user=user_2_db.name, role=SystemRole.ADMIN,
178
            source='assignments/%s.yaml' % user_2_db.name)
179
        UserRoleAssignment.add_or_update(role_assignment_db)
180
181
    def tearDown(self):
182
        super(BaseAPIControllerWithRBACTestCase, self).tearDown()
183
184
        if getattr(self, 'request_context_mock', None):
185
            self.request_context_mock.stop()
186
            del(Router.mock_context)
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after del.
Loading history...
187
188
    def use_user(self, user_db):
189
        """
190
        Select a user which is to be used by the HTTP request following this call.
191
        """
192
        if not user_db:
193
            raise ValueError('"user_db" is mandatory')
194
195
        mock_context = {
196
            'user': user_db,
197
            'auth_info': {
198
                'method': 'authentication token',
199
                'location': 'header'
200
            }
201
        }
202
        self.request_context_mock = mock.PropertyMock(return_value=mock_context)
203
        Router.mock_context = self.request_context_mock
204