1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | """ |
||
17 | Various base classes and test utility functions for API related tests. |
||
18 | """ |
||
19 | |||
20 | from __future__ import absolute_import |
||
21 | |||
22 | import six |
||
23 | import webtest |
||
24 | import mock |
||
25 | from oslo_config import cfg |
||
26 | |||
27 | from st2common.router import Router |
||
28 | from st2common.rbac.types import SystemRole |
||
29 | from st2common.persistence.auth import User |
||
30 | from st2common.persistence.rbac import UserRoleAssignment |
||
31 | from st2common.models.db.auth import UserDB |
||
32 | from st2common.models.db.rbac import UserRoleAssignmentDB |
||
33 | from st2common.rbac.migrations import run_all as run_all_rbac_migrations |
||
34 | from st2common.bootstrap import runnersregistrar as runners_registrar |
||
35 | from st2tests.base import DbTestCase |
||
36 | from st2tests.base import CleanDbTestCase |
||
37 | from st2tests import config as tests_config |
||
38 | |||
39 | __all__ = [ |
||
40 | 'BaseFunctionalTest', |
||
41 | 'BaseAPIControllerWithRBACTestCase', |
||
42 | |||
43 | 'TestApp' |
||
44 | ] |
||
45 | |||
46 | |||
47 | SUPER_SECRET_PARAMETER = 'SUPER_SECRET_PARAMETER_THAT_SHOULD_NEVER_APPEAR_IN_RESPONSES_OR_LOGS' |
||
48 | ANOTHER_SUPER_SECRET_PARAMETER = 'ANOTHER_SUPER_SECRET_PARAMETER_TO_TEST_OVERRIDING' |
||
49 | |||
50 | |||
51 | class ResponseValidationError(ValueError): |
||
52 | pass |
||
53 | |||
54 | |||
55 | class ResponseLeakError(ValueError): |
||
56 | pass |
||
57 | |||
58 | |||
59 | class TestApp(webtest.TestApp): |
||
60 | def do_request(self, req, **kwargs): |
||
61 | self.cookiejar.clear() |
||
62 | |||
63 | if req.environ['REQUEST_METHOD'] != 'OPTIONS': |
||
64 | # Making sure endpoint handles OPTIONS method properly |
||
65 | self.options(req.environ['PATH_INFO']) |
||
66 | |||
67 | res = super(TestApp, self).do_request(req, **kwargs) |
||
68 | |||
69 | if res.headers.get('Warning', None): |
||
70 | raise ResponseValidationError('Endpoint produced invalid response. Make sure the ' |
||
71 | 'response matches OpenAPI scheme for the endpoint.') |
||
72 | |||
73 | if not kwargs.get('expect_errors', None): |
||
74 | try: |
||
75 | body = res.body |
||
76 | except AssertionError as e: |
||
77 | if 'Iterator read after closed' in str(e): |
||
78 | body = b'' |
||
79 | else: |
||
80 | raise e |
||
81 | |||
82 | if six.b(SUPER_SECRET_PARAMETER) in body or \ |
||
83 | six.b(ANOTHER_SUPER_SECRET_PARAMETER) in body: |
||
84 | raise ResponseLeakError('Endpoint response contains secret parameter. ' |
||
85 | 'Find the leak.') |
||
86 | |||
87 | if 'Access-Control-Allow-Origin' not in res.headers: |
||
88 | raise ResponseValidationError('Response missing a required CORS header') |
||
89 | |||
90 | return res |
||
91 | |||
92 | |||
93 | class BaseFunctionalTest(DbTestCase): |
||
94 | """ |
||
95 | Base test case class for testing API controllers with auth and RBAC disabled. |
||
96 | """ |
||
97 | |||
98 | # App used by the tests |
||
99 | app_module = None |
||
100 | |||
101 | # By default auth is disabled |
||
102 | enable_auth = False |
||
103 | |||
104 | register_runners = True |
||
105 | |||
106 | @classmethod |
||
107 | def setUpClass(cls): |
||
108 | super(BaseFunctionalTest, cls).setUpClass() |
||
109 | cls._do_setUpClass() |
||
110 | |||
111 | @classmethod |
||
112 | def _do_setUpClass(cls): |
||
113 | tests_config.parse_args() |
||
114 | |||
115 | cfg.CONF.set_default('enable', cls.enable_auth, group='auth') |
||
116 | |||
117 | cfg.CONF.set_override(name='enable', override=False, group='rbac') |
||
118 | |||
119 | # TODO(manas) : register action types here for now. RunnerType registration can be moved |
||
120 | # to posting to /runnertypes but that implies implementing POST. |
||
121 | if cls.register_runners: |
||
122 | runners_registrar.register_runners() |
||
123 | |||
124 | cls.app = TestApp(cls.app_module.setup_app()) |
||
125 | |||
126 | |||
127 | class BaseAPIControllerWithRBACTestCase(BaseFunctionalTest, CleanDbTestCase): |
||
128 | """ |
||
129 | Base test case class for testing API controllers with RBAC enabled. |
||
130 | """ |
||
131 | |||
132 | enable_auth = True |
||
133 | |||
134 | @classmethod |
||
135 | def setUpClass(cls): |
||
136 | super(BaseAPIControllerWithRBACTestCase, cls).setUpClass() |
||
137 | |||
138 | # Make sure RBAC is enabeld |
||
139 | cfg.CONF.set_override(name='enable', override=True, group='rbac') |
||
140 | |||
141 | @classmethod |
||
142 | def tearDownClass(cls): |
||
143 | super(BaseAPIControllerWithRBACTestCase, cls).tearDownClass() |
||
144 | |||
145 | def setUp(self): |
||
146 | super(BaseAPIControllerWithRBACTestCase, self).setUp() |
||
147 | |||
148 | self.users = {} |
||
149 | self.roles = {} |
||
150 | |||
151 | # Run RBAC migrations |
||
152 | run_all_rbac_migrations() |
||
153 | |||
154 | # Insert mock users with default role assignments |
||
155 | role_names = [SystemRole.SYSTEM_ADMIN, SystemRole.ADMIN, SystemRole.OBSERVER] |
||
156 | for role_name in role_names: |
||
157 | user_db = UserDB(name=role_name) |
||
158 | user_db = User.add_or_update(user_db) |
||
159 | self.users[role_name] = user_db |
||
160 | |||
161 | role_assignment_db = UserRoleAssignmentDB( |
||
162 | user=user_db.name, role=role_name, |
||
163 | source='assignments/%s.yaml' % user_db.name) |
||
164 | UserRoleAssignment.add_or_update(role_assignment_db) |
||
165 | |||
166 | # Insert a user with no permissions and role assignments |
||
167 | user_1_db = UserDB(name='no_permissions') |
||
168 | user_1_db = User.add_or_update(user_1_db) |
||
169 | self.users['no_permissions'] = user_1_db |
||
170 | |||
171 | # Insert special system user |
||
172 | user_2_db = UserDB(name='system_user') |
||
173 | user_2_db = User.add_or_update(user_2_db) |
||
174 | self.users['system_user'] = user_2_db |
||
175 | |||
176 | role_assignment_db = UserRoleAssignmentDB( |
||
177 | user=user_2_db.name, role=SystemRole.ADMIN, |
||
178 | source='assignments/%s.yaml' % user_2_db.name) |
||
179 | UserRoleAssignment.add_or_update(role_assignment_db) |
||
180 | |||
181 | def tearDown(self): |
||
182 | super(BaseAPIControllerWithRBACTestCase, self).tearDown() |
||
183 | |||
184 | if getattr(self, 'request_context_mock', None): |
||
185 | self.request_context_mock.stop() |
||
186 | del(Router.mock_context) |
||
0 ignored issues
–
show
Unused Code
Coding Style
introduced
by
Loading history...
|
|||
187 | |||
188 | def use_user(self, user_db): |
||
189 | """ |
||
190 | Select a user which is to be used by the HTTP request following this call. |
||
191 | """ |
||
192 | if not user_db: |
||
193 | raise ValueError('"user_db" is mandatory') |
||
194 | |||
195 | mock_context = { |
||
196 | 'user': user_db, |
||
197 | 'auth_info': { |
||
198 | 'method': 'authentication token', |
||
199 | 'location': 'header' |
||
200 | } |
||
201 | } |
||
202 | self.request_context_mock = mock.PropertyMock(return_value=mock_context) |
||
203 | Router.mock_context = self.request_context_mock |
||
204 |