Passed
Pull Request — master (#543)
by
unknown
13:58 queued 13s
created

tests.integration.test_gzip   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 99
Duplicated Lines 46.46 %

Importance

Changes 0
Metric Value
wmc 8
eloc 53
dl 46
loc 99
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A TestEvaluate.test_single_value_returned() 0 19 1
B TestEvaluate._get_config_file_name() 46 46 6
A TestEvaluate.test_syntax_error() 0 20 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
Script evaluation tests.
3
"""
4
5
from . import integ_test_base
6
import json
7
import gzip
8
import os
9
import requests
10
11
class TestEvaluate(integ_test_base.IntegTestBase):
12 View Code Duplication
    def _get_config_file_name(self) -> str:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
13
        """
14
        Generates config file. Overwrite this function for tests to
15
        run against not default state file.
16
17
        Returns
18
        -------
19
        str
20
            Absolute path to config file.
21
        """
22
        config_file = open(os.path.join(self.tmp_dir, "test.conf"), "w+")
23
        config_file.write(
24
            "[TabPy]\n"
25
            f"TABPY_QUERY_OBJECT_PATH = {self.tmp_dir}/query_objects\n"
26
            f"TABPY_PORT = {self._get_port()}\n"
27
            f"TABPY_GZIP_ENABLE = TRUE\n"
28
            f"TABPY_STATE_PATH = {self.tmp_dir}\n"
29
        )
30
31
        pwd_file = self._get_pwd_file()
32
        if pwd_file is not None:
33
            pwd_file = os.path.abspath(pwd_file)
34
            config_file.write(f"TABPY_PWD_FILE = {pwd_file}\n")
35
36
        transfer_protocol = self._get_transfer_protocol()
37
        if transfer_protocol is not None:
38
            config_file.write(f"TABPY_TRANSFER_PROTOCOL = {transfer_protocol}\n")
39
40
        cert_file_name = self._get_certificate_file_name()
41
        if cert_file_name is not None:
42
            cert_file_name = os.path.abspath(cert_file_name)
43
            config_file.write(f"TABPY_CERTIFICATE_FILE = {cert_file_name}\n")
44
45
        key_file_name = self._get_key_file_name()
46
        if key_file_name is not None:
47
            key_file_name = os.path.abspath(key_file_name)
48
            config_file.write(f"TABPY_KEY_FILE = {key_file_name}\n")
49
50
        evaluate_timeout = self._get_evaluate_timeout()
51
        if evaluate_timeout is not None:
52
            config_file.write(f"TABPY_EVALUATE_TIMEOUT = {evaluate_timeout}\n")
53
54
        config_file.close()
55
56
        self.delete_config_file = True
57
        return config_file.name
58
59
    def test_single_value_returned(self):
60
        payload = """
61
            {
62
                "data": { "_arg1": 2, "_arg2": 40 },
63
                "script":
64
                "return _arg1 + _arg2"
65
            }
66
            """
67
        headers = {
68
            "Content-Type": "application/json",
69
            "Content-Encoding": "gzip",
70
        }
71
72
        url = self._get_url() + "/evaluate"
73
        response = requests.request("POST", url, data=gzip.compress(payload.encode('utf-8')), headers=headers)
74
        result = json.loads(response.text)
75
76
        self.assertEqual(200, response.status_code)
77
        self.assertEqual(42, result)
78
79
    def test_syntax_error(self):
80
        payload = """
81
            {
82
                "data": { "_arg1": [2], "_arg2": [40] },
83
                "script":
84
                "% ^ !! return Nothing"
85
            }
86
            """
87
        headers = {
88
            "Content-Type": "application/json",
89
            "Content-Encoding": "gzip",
90
        }
91
92
        url = self._get_url() + "/evaluate"
93
        response = requests.request("POST", url, data=gzip.compress(payload.encode('utf-8')), headers=headers)
94
        result = json.loads(response.text)
95
96
        self.assertEqual(500, response.status_code)
97
        self.assertEqual("Error processing script", result["message"])
98
        self.assertTrue(result["info"].startswith("SyntaxError"))
99