tests.integration.test_gzip   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 102
Duplicated Lines 45.1 %

Importance

Changes 0
Metric Value
wmc 8
eloc 55
dl 46
loc 102
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A TestEvaluate.test_single_value_returned() 0 20 1
B TestEvaluate._get_config_file_name() 46 46 6
A TestEvaluate.test_syntax_error() 0 21 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
Script evaluation tests.
3
"""
4
5
from . import integ_test_base
6
import json
7
import gzip
8
import os
9
import requests
10
11
12
class TestEvaluate(integ_test_base.IntegTestBase):
13 View Code Duplication
    def _get_config_file_name(self) -> str:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
14
        """
15
        Generates config file. Overwrite this function for tests to
16
        run against not default state file.
17
18
        Returns
19
        -------
20
        str
21
            Absolute path to config file.
22
        """
23
        config_file = open(os.path.join(self.tmp_dir, "test.conf"), "w+")
24
        config_file.write(
25
            "[TabPy]\n"
26
            f"TABPY_QUERY_OBJECT_PATH = {self.tmp_dir}/query_objects\n"
27
            f"TABPY_PORT = {self._get_port()}\n"
28
            f"TABPY_GZIP_ENABLE = TRUE\n"
29
            f"TABPY_STATE_PATH = {self.tmp_dir}\n"
30
        )
31
32
        pwd_file = self._get_pwd_file()
33
        if pwd_file is not None:
34
            pwd_file = os.path.abspath(pwd_file)
35
            config_file.write(f"TABPY_PWD_FILE = {pwd_file}\n")
36
37
        transfer_protocol = self._get_transfer_protocol()
38
        if transfer_protocol is not None:
39
            config_file.write(f"TABPY_TRANSFER_PROTOCOL = {transfer_protocol}\n")
40
41
        cert_file_name = self._get_certificate_file_name()
42
        if cert_file_name is not None:
43
            cert_file_name = os.path.abspath(cert_file_name)
44
            config_file.write(f"TABPY_CERTIFICATE_FILE = {cert_file_name}\n")
45
46
        key_file_name = self._get_key_file_name()
47
        if key_file_name is not None:
48
            key_file_name = os.path.abspath(key_file_name)
49
            config_file.write(f"TABPY_KEY_FILE = {key_file_name}\n")
50
51
        evaluate_timeout = self._get_evaluate_timeout()
52
        if evaluate_timeout is not None:
53
            config_file.write(f"TABPY_EVALUATE_TIMEOUT = {evaluate_timeout}\n")
54
55
        config_file.close()
56
57
        self.delete_config_file = True
58
        return config_file.name
59
60
    def test_single_value_returned(self):
61
        payload = """
62
            {
63
                "data": { "_arg1": 2, "_arg2": 40 },
64
                "script":
65
                "return _arg1 + _arg2"
66
            }
67
            """
68
        headers = {
69
            "Content-Type": "application/json",
70
            "Content-Encoding": "gzip",
71
        }
72
73
        url = self._get_url() + "/evaluate"
74
        response = requests.request("POST", url, data=gzip.compress(payload.encode('utf-8')),
75
            headers=headers)
76
        result = json.loads(response.text)
77
78
        self.assertEqual(200, response.status_code)
79
        self.assertEqual(42, result)
80
81
    def test_syntax_error(self):
82
        payload = """
83
            {
84
                "data": { "_arg1": [2], "_arg2": [40] },
85
                "script":
86
                "% ^ !! return Nothing"
87
            }
88
            """
89
        headers = {
90
            "Content-Type": "application/json",
91
            "Content-Encoding": "gzip",
92
        }
93
94
        url = self._get_url() + "/evaluate"
95
        response = requests.request("POST", url, data=gzip.compress(payload.encode('utf-8')),
96
            headers=headers)
97
        result = json.loads(response.text)
98
99
        self.assertEqual(500, response.status_code)
100
        self.assertEqual("Error processing script", result["message"])
101
        self.assertTrue(result["info"].startswith("SyntaxError"))
102