@@ 159-203 (lines=45) @@ | ||
156 | """ |
|
157 | return None |
|
158 | ||
159 | def _get_config_file_name(self) -> str: |
|
160 | """ |
|
161 | Generates config file. Overwrite this function for tests to |
|
162 | run against not default state file. |
|
163 | ||
164 | Returns |
|
165 | ------- |
|
166 | str |
|
167 | Absolute path to config file. |
|
168 | """ |
|
169 | config_file = open(os.path.join(self.tmp_dir, "test.conf"), "w+") |
|
170 | config_file.write( |
|
171 | "[TabPy]\n" |
|
172 | f"TABPY_QUERY_OBJECT_PATH = {self.tmp_dir}/query_objects\n" |
|
173 | f"TABPY_PORT = {self._get_port()}\n" |
|
174 | f"TABPY_STATE_PATH = {self.tmp_dir}\n" |
|
175 | ) |
|
176 | ||
177 | pwd_file = self._get_pwd_file() |
|
178 | if pwd_file is not None: |
|
179 | pwd_file = os.path.abspath(pwd_file) |
|
180 | config_file.write(f"TABPY_PWD_FILE = {pwd_file}\n") |
|
181 | ||
182 | transfer_protocol = self._get_transfer_protocol() |
|
183 | if transfer_protocol is not None: |
|
184 | config_file.write(f"TABPY_TRANSFER_PROTOCOL = {transfer_protocol}\n") |
|
185 | ||
186 | cert_file_name = self._get_certificate_file_name() |
|
187 | if cert_file_name is not None: |
|
188 | cert_file_name = os.path.abspath(cert_file_name) |
|
189 | config_file.write(f"TABPY_CERTIFICATE_FILE = {cert_file_name}\n") |
|
190 | ||
191 | key_file_name = self._get_key_file_name() |
|
192 | if key_file_name is not None: |
|
193 | key_file_name = os.path.abspath(key_file_name) |
|
194 | config_file.write(f"TABPY_KEY_FILE = {key_file_name}\n") |
|
195 | ||
196 | evaluate_timeout = self._get_evaluate_timeout() |
|
197 | if evaluate_timeout is not None: |
|
198 | config_file.write(f"TABPY_EVALUATE_TIMEOUT = {evaluate_timeout}\n") |
|
199 | ||
200 | config_file.close() |
|
201 | ||
202 | self.delete_config_file = True |
|
203 | return config_file.name |
|
204 | ||
205 | def setUp(self): |
|
206 | super(IntegTestBase, self).setUp() |
@@ 13-58 (lines=46) @@ | ||
10 | ||
11 | ||
12 | class TestEvaluate(integ_test_base.IntegTestBase): |
|
13 | def _get_config_file_name(self) -> str: |
|
14 | """ |
|
15 | Generates config file. Overwrite this function for tests to |
|
16 | run against not default state file. |
|
17 | ||
18 | Returns |
|
19 | ------- |
|
20 | str |
|
21 | Absolute path to config file. |
|
22 | """ |
|
23 | config_file = open(os.path.join(self.tmp_dir, "test.conf"), "w+") |
|
24 | config_file.write( |
|
25 | "[TabPy]\n" |
|
26 | f"TABPY_QUERY_OBJECT_PATH = {self.tmp_dir}/query_objects\n" |
|
27 | f"TABPY_PORT = {self._get_port()}\n" |
|
28 | f"TABPY_GZIP_ENABLE = TRUE\n" |
|
29 | f"TABPY_STATE_PATH = {self.tmp_dir}\n" |
|
30 | ) |
|
31 | ||
32 | pwd_file = self._get_pwd_file() |
|
33 | if pwd_file is not None: |
|
34 | pwd_file = os.path.abspath(pwd_file) |
|
35 | config_file.write(f"TABPY_PWD_FILE = {pwd_file}\n") |
|
36 | ||
37 | transfer_protocol = self._get_transfer_protocol() |
|
38 | if transfer_protocol is not None: |
|
39 | config_file.write(f"TABPY_TRANSFER_PROTOCOL = {transfer_protocol}\n") |
|
40 | ||
41 | cert_file_name = self._get_certificate_file_name() |
|
42 | if cert_file_name is not None: |
|
43 | cert_file_name = os.path.abspath(cert_file_name) |
|
44 | config_file.write(f"TABPY_CERTIFICATE_FILE = {cert_file_name}\n") |
|
45 | ||
46 | key_file_name = self._get_key_file_name() |
|
47 | if key_file_name is not None: |
|
48 | key_file_name = os.path.abspath(key_file_name) |
|
49 | config_file.write(f"TABPY_KEY_FILE = {key_file_name}\n") |
|
50 | ||
51 | evaluate_timeout = self._get_evaluate_timeout() |
|
52 | if evaluate_timeout is not None: |
|
53 | config_file.write(f"TABPY_EVALUATE_TIMEOUT = {evaluate_timeout}\n") |
|
54 | ||
55 | config_file.close() |
|
56 | ||
57 | self.delete_config_file = True |
|
58 | return config_file.name |
|
59 | ||
60 | def test_single_value_returned(self): |
|
61 | payload = """ |