Passed
Push — master ( 53016e...38fde4 )
by Oleksandr
02:52
created

test_evaluation_plane_handler   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 206
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 126
dl 0
loc 206
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A TestEvaluationPlainHandlerWithAuth.get_app() 0 3 1
A TestEvaluationPlainHandlerWithAuth.test_invalid_creds_fails() 0 14 1
A TestEvaluationPlainHandlerWithAuth.setUpClass() 0 73 1
A TestEvaluationPlainHandlerWithAuth.test_script_returns_none() 0 13 1
A TestEvaluationPlainHandlerWithAuth.test_script_not_present() 0 14 1
A TestEvaluationPlainHandlerWithAuth.test_arguments_not_sequential() 0 14 1
A TestEvaluationPlainHandlerWithAuth.test_valid_creds_pass() 0 14 1
A TestEvaluationPlainHandlerWithAuth.test_no_creds_required_auth_fails() 0 3 1
A TestEvaluationPlainHandlerWithAuth.test_arguments_not_present() 0 15 1
A TestEvaluationPlainHandlerWithAuth.test_nan_converts_to_null() 0 13 1
A TestEvaluationPlainHandlerWithAuth.tearDownClass() 0 6 1
1
import base64
2
import os
3
import tempfile
4
5
from argparse import Namespace
6
from tabpy.tabpy_server.app.app import TabPyApp
7
from tabpy.tabpy_server.handlers.util import hash_password
8
from tornado.testing import AsyncHTTPTestCase
9
10
11
class TestEvaluationPlainHandlerWithAuth(AsyncHTTPTestCase):
12
    @classmethod
13
    def setUpClass(cls):
14
        prefix = "__TestEvaluationPlainHandlerWithAuth_"
15
        # create password file
16
        cls.pwd_file = tempfile.NamedTemporaryFile(
17
            mode="w+t", prefix=prefix, suffix=".txt", delete=False
18
        )
19
        username = "username"
20
        password = "password"
21
        cls.pwd_file.write(f"{username} {hash_password(username, password)}\n")
22
        cls.pwd_file.close()
23
24
        # create state.ini dir and file
25
        cls.state_dir = tempfile.mkdtemp(prefix=prefix)
26
        cls.state_file = open(os.path.join(cls.state_dir, "state.ini"), "w+")
27
        cls.state_file.write(
28
            "[Service Info]\n"
29
            "Name = TabPy Serve\n"
30
            "Description = \n"
31
            "Creation Time = 0\n"
32
            "Access-Control-Allow-Origin = \n"
33
            "Access-Control-Allow-Headers = \n"
34
            "Access-Control-Allow-Methods = \n"
35
            "\n"
36
            "[Query Objects Service Versions]\n"
37
            "\n"
38
            "[Query Objects Docstrings]\n"
39
            "\n"
40
            "[Meta]\n"
41
            "Revision Number = 1\n"
42
        )
43
        cls.state_file.close()
44
45
        # create config file
46
        cls.config_file = tempfile.NamedTemporaryFile(
47
            mode="w+t", prefix=prefix, suffix=".conf", delete=False
48
        )
49
        cls.config_file.write(
50
            "[TabPy]\n"
51
            f"TABPY_PWD_FILE = {cls.pwd_file.name}\n"
52
            f"TABPY_STATE_PATH = {cls.state_dir}"
53
        )
54
        cls.config_file.close()
55
56
        cls.script = (
57
            '{"data":{"_arg1":[2,3],"_arg2":[3,-1]},'
58
            '"script":"res=[]\\nfor i in range(len(_arg1)):\\n  '
59
            'res.append(_arg1[i] * _arg2[i])\\nreturn res"}'
60
        )
61
62
        cls.script_not_present = (
63
            '{"data":{"_arg1":[2,3],"_arg2":[3,-1]},'
64
            '"":"res=[]\\nfor i in range(len(_arg1)):\\n  '
65
            'res.append(_arg1[i] * _arg2[i])\\nreturn res"}'
66
        )
67
68
        cls.args_not_present = (
69
            '{"script":"res=[]\\nfor i in range(len(_arg1)):\\n  '
70
            'res.append(_arg1[i] * _arg2[i])\\nreturn res"}'
71
        )
72
73
        cls.args_not_sequential = (
74
            '{"data":{"_arg1":[2,3],"_arg3":[3,-1]},'
75
            '"script":"res=[]\\nfor i in range(len(_arg1)):\\n  '
76
            'res.append(_arg1[i] * _arg3[i])\\nreturn res"}'
77
        )
78
79
        cls.nan_coverts_to_null =\
80
            '{"data":{"_arg1":[2,3],"_arg2":[3,-1]},'\
81
            '"script":"return [float(1), float(\\"NaN\\"), float(2)]"}'
82
83
        cls.script_returns_none = (
84
            '{"data":{"_arg1":[2,3],"_arg2":[3,-1]},'
85
            '"script":"return None"}'
86
        )
87
88
    @classmethod
89
    def tearDownClass(cls):
90
        os.remove(cls.pwd_file.name)
91
        os.remove(cls.state_file.name)
92
        os.remove(cls.config_file.name)
93
        os.rmdir(cls.state_dir)
94
95
    def get_app(self):
96
        self.app = TabPyApp(self.config_file.name)
97
        return self.app._create_tornado_web_app()
98
99
    def test_no_creds_required_auth_fails(self):
100
        response = self.fetch("/evaluate", method="POST", body=self.script)
101
        self.assertEqual(401, response.code)
102
103
    def test_invalid_creds_fails(self):
104
        response = self.fetch(
105
            "/evaluate",
106
            method="POST",
107
            body=self.script,
108
            headers={
109
                "Authorization": "Basic {}".format(
110
                    base64.b64encode("user:wrong_password".encode("utf-8")).decode(
111
                        "utf-8"
112
                    )
113
                )
114
            },
115
        )
116
        self.assertEqual(401, response.code)
117
118
    def test_valid_creds_pass(self):
119
        response = self.fetch(
120
            "/evaluate",
121
            method="POST",
122
            body=self.script,
123
            headers={
124
                "Authorization": "Basic {}".format(
125
                    base64.b64encode("username:password".encode("utf-8")).decode(
126
                        "utf-8"
127
                    )
128
                )
129
            },
130
        )
131
        self.assertEqual(200, response.code)
132
133
    def test_script_not_present(self):
134
        response = self.fetch(
135
            "/evaluate",
136
            method="POST",
137
            body=self.script_not_present,
138
            headers={
139
                "Authorization": "Basic {}".format(
140
                    base64.b64encode("username:password".encode("utf-8")).decode(
141
                        "utf-8"
142
                    )
143
                )
144
            },
145
        )
146
        self.assertEqual(400, response.code)
147
148
    def test_arguments_not_present(self):
149
        response = self.fetch(
150
            "/evaluate",
151
            method="POST",
152
            body=self.args_not_present,
153
            headers={
154
                "Authorization": "Basic {}".format(
155
                    base64.b64encode("username:password".encode("utf-8")).decode(
156
                        "utf-8"
157
                    )
158
                )
159
160
            },
161
        )
162
        self.assertEqual(500, response.code)
163
164
    def test_arguments_not_sequential(self):
165
        response = self.fetch(
166
            "/evaluate",
167
            method="POST",
168
            body=self.args_not_sequential,
169
            headers={
170
                "Authorization": "Basic {}".format(
171
                    base64.b64encode("username:password".encode("utf-8")).decode(
172
                        "utf-8"
173
                    )
174
                )
175
            },
176
        )
177
        self.assertEqual(400, response.code)
178
179
    def test_nan_converts_to_null(self):
180
        response = self.fetch(
181
            '/evaluate',
182
            method='POST',
183
            body=self.nan_coverts_to_null,
184
            headers={
185
                'Authorization': 'Basic {}'.
186
                format(
187
                    base64.b64encode('username:password'.encode('utf-8')).
188
                    decode('utf-8'))
189
            })
190
        self.assertEqual(200, response.code)
191
        self.assertEqual(b'[1.0, null, 2.0]', response.body)
192
193
    def test_script_returns_none(self):
194
        response = self.fetch(
195
            '/evaluate',
196
            method='POST',
197
            body=self.script_returns_none,
198
            headers={
199
                'Authorization': 'Basic {}'.
200
                format(
201
                    base64.b64encode('username:password'.encode('utf-8')).
202
                    decode('utf-8'))
203
            })
204
        self.assertEqual(200, response.code)
205
        self.assertEqual(b'null', response.body)
206