1 | """ |
||
2 | Script evaluation tests. |
||
3 | """ |
||
4 | |||
5 | from . import integ_test_base |
||
6 | import json |
||
7 | import gzip |
||
8 | import os |
||
9 | import requests |
||
10 | |||
11 | |||
12 | class TestEvaluate(integ_test_base.IntegTestBase): |
||
13 | View Code Duplication | def _get_config_file_name(self) -> str: |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
14 | """ |
||
15 | Generates config file. Overwrite this function for tests to |
||
16 | run against not default state file. |
||
17 | |||
18 | Returns |
||
19 | ------- |
||
20 | str |
||
21 | Absolute path to config file. |
||
22 | """ |
||
23 | config_file = open(os.path.join(self.tmp_dir, "test.conf"), "w+") |
||
24 | config_file.write( |
||
25 | "[TabPy]\n" |
||
26 | f"TABPY_QUERY_OBJECT_PATH = {self.tmp_dir}/query_objects\n" |
||
27 | f"TABPY_PORT = {self._get_port()}\n" |
||
28 | f"TABPY_GZIP_ENABLE = TRUE\n" |
||
29 | f"TABPY_STATE_PATH = {self.tmp_dir}\n" |
||
30 | ) |
||
31 | |||
32 | pwd_file = self._get_pwd_file() |
||
33 | if pwd_file is not None: |
||
34 | pwd_file = os.path.abspath(pwd_file) |
||
35 | config_file.write(f"TABPY_PWD_FILE = {pwd_file}\n") |
||
36 | |||
37 | transfer_protocol = self._get_transfer_protocol() |
||
38 | if transfer_protocol is not None: |
||
39 | config_file.write(f"TABPY_TRANSFER_PROTOCOL = {transfer_protocol}\n") |
||
40 | |||
41 | cert_file_name = self._get_certificate_file_name() |
||
42 | if cert_file_name is not None: |
||
43 | cert_file_name = os.path.abspath(cert_file_name) |
||
44 | config_file.write(f"TABPY_CERTIFICATE_FILE = {cert_file_name}\n") |
||
45 | |||
46 | key_file_name = self._get_key_file_name() |
||
47 | if key_file_name is not None: |
||
48 | key_file_name = os.path.abspath(key_file_name) |
||
49 | config_file.write(f"TABPY_KEY_FILE = {key_file_name}\n") |
||
50 | |||
51 | evaluate_timeout = self._get_evaluate_timeout() |
||
52 | if evaluate_timeout is not None: |
||
53 | config_file.write(f"TABPY_EVALUATE_TIMEOUT = {evaluate_timeout}\n") |
||
54 | |||
55 | config_file.close() |
||
56 | |||
57 | self.delete_config_file = True |
||
58 | return config_file.name |
||
59 | |||
60 | def test_single_value_returned(self): |
||
61 | payload = """ |
||
62 | { |
||
63 | "data": { "_arg1": 2, "_arg2": 40 }, |
||
64 | "script": |
||
65 | "return _arg1 + _arg2" |
||
66 | } |
||
67 | """ |
||
68 | headers = { |
||
69 | "Content-Type": "application/json", |
||
70 | "Content-Encoding": "gzip", |
||
71 | } |
||
72 | |||
73 | url = self._get_url() + "/evaluate" |
||
74 | response = requests.request("POST", url, data=gzip.compress(payload.encode('utf-8')), |
||
75 | headers=headers) |
||
76 | result = json.loads(response.text) |
||
77 | |||
78 | self.assertEqual(200, response.status_code) |
||
79 | self.assertEqual(42, result) |
||
80 | |||
81 | def test_syntax_error(self): |
||
82 | payload = """ |
||
83 | { |
||
84 | "data": { "_arg1": [2], "_arg2": [40] }, |
||
85 | "script": |
||
86 | "% ^ !! return Nothing" |
||
87 | } |
||
88 | """ |
||
89 | headers = { |
||
90 | "Content-Type": "application/json", |
||
91 | "Content-Encoding": "gzip", |
||
92 | } |
||
93 | |||
94 | url = self._get_url() + "/evaluate" |
||
95 | response = requests.request("POST", url, data=gzip.compress(payload.encode('utf-8')), |
||
96 | headers=headers) |
||
97 | result = json.loads(response.text) |
||
98 | |||
99 | self.assertEqual(500, response.status_code) |
||
100 | self.assertEqual("Error processing script", result["message"]) |
||
101 | self.assertTrue(result["info"].startswith("SyntaxError")) |
||
102 |