Test Failed
Pull Request — master (#592)
by
unknown
06:58
created

tabpy.tabpy_server.handlers.evaluation_plane_handler   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 196
Duplicated Lines 0 %

Test Coverage

Coverage 85.87%

Importance

Changes 0
Metric Value
wmc 23
eloc 133
dl 0
loc 196
ccs 79
cts 92
cp 0.8587
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
C EvaluationPlaneHandler._post_impl() 0 70 9
A EvaluationPlaneDisabledHandler.initialize() 0 3 1
A RestrictedTabPy.query() 0 10 1
A EvaluationPlaneDisabledHandler.post() 0 6 2
A RestrictedTabPy.__init__() 0 6 1
A EvaluationPlaneHandler.get_arrow_data() 0 8 1
A EvaluationPlaneHandler._call_subprocess() 0 18 2
A EvaluationPlaneHandler.initialize() 0 5 1
A EvaluationPlaneHandler.post() 0 19 4
A EvaluationPlaneHandler.upload_arrow_data() 0 8 1
1 1
import pandas
2 1
import pyarrow
3 1
4 1
from tabpy.tabpy_server.handlers import BaseHandler, arrow_client
5 1
import json
6 1
import simplejson
7 1
import logging
8 1
from tabpy.tabpy_server.common.util import format_exception
9 1
import requests
10
from tornado import gen
11
from datetime import timedelta
12 1
from tabpy.tabpy_server.handlers.util import AuthErrorStates
13 1
14 1
class RestrictedTabPy:
15 1
    def __init__(self, protocol, port, logger, timeout, headers):
16 1
        self.protocol = protocol
17 1
        self.port = port
18 1
        self.logger = logger
19
        self.timeout = timeout
20 1
        self.headers = headers
21
22
    def query(self, name, *args, **kwargs):
23
        url = f"{self.protocol}://localhost:{self.port}/query/{name}"
24
        self.logger.log(logging.DEBUG, f"Querying {url}...")
25
        internal_data = {"data": args or kwargs}
26
        data = json.dumps(internal_data)
27
        headers = self.headers
28
        response = requests.post(
29
            url=url, data=data, headers=headers, timeout=self.timeout, verify=False
30
        )
31
        return response.json()
32 1
33
34
class EvaluationPlaneDisabledHandler(BaseHandler):
35
    """
36
    EvaluationPlaneDisabledHandler responds with error message when ad-hoc scripts have been disabled.
37 1
    """
38 1
39 1
    def initialize(self, executor, app):
40
        super(EvaluationPlaneDisabledHandler, self).initialize(app)
41 1
        self.executor = executor
42
43 1
    @gen.coroutine
44 1
    def post(self):
45 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
46 1
            self.fail_with_auth_error()
47
            return
48
        self.error_out(404, "Ad-hoc scripts have been disabled on this analytics extension, please contact your "
49
                            "administrator.")
50 1
51
52
class EvaluationPlaneHandler(BaseHandler):
53
    """
54
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
55 1
    """
56 1
57 1
    def initialize(self, executor, app):
58 1
        super(EvaluationPlaneHandler, self).initialize(app)
59
        self.executor = executor
60
        self._error_message_timeout = (
61
            f"User defined script timed out. "
62
            f"Timeout is set to {self.eval_timeout} s."
63 1
        )
64
65 1
    @gen.coroutine
66 1
    def _post_impl(self):
67 1
        body = json.loads(self.request.body.decode("utf-8"))
68 1
        self.logger.log(logging.DEBUG, f"Processing POST request '{body}'...")
69 1
        if "script" not in body:
70
            self.error_out(400, "Script is empty.")
71
            return
72 1
73 1
        # Transforming user script into a proper function.
74 1
        user_code = body["script"]
75 1
        arguments = None
76 1
        arguments_str = ""
77
        if "data" in body:
78 1
            arguments = body["data"]
79 1
80
        if arguments is not None:
81
            if not isinstance(arguments, dict):
82
                self.error_out(
83
                    400, "Script parameters need to be provided as a dictionary."
84 1
                )
85 1
                return
86 1
            args_in = sorted(arguments.keys())
87 1
            n = len(arguments)
88
            if sorted('_arg'+str(i+1) for i in range(n)) == args_in:
89 1
                arguments_str = ", " + ", ".join(args_in)
90
            else:
91
                self.error_out(
92
                    400,
93
                    "Variables names should follow "
94 1
                    "the format _arg1, _arg2, _argN",
95 1
                )
96 1
                return
97 1
        function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n"
98
        for u in user_code.splitlines():
99 1
            function_to_evaluate += " " + u + "\n"
100
101
        self.logger.log(
102
            logging.INFO, f"function to evaluate={function_to_evaluate}"
103 1
        )
104 1
105 1
        print(f"function to evaluate={function_to_evaluate}")
106
        arrow_data = self.get_arrow_data('input.csv')
107
        arguments['_arg1'] = arrow_data
108
109
        print(f"arguments={arguments}")
110
        # print(f"input arrow data={arrow_data}")
111
112
        try:
113
            result = yield self._call_subprocess(function_to_evaluate, arguments)
114 1
            # result = yield self._call_subprocess(function_to_evaluate, arrowData)
115 1
        except (
116
            gen.TimeoutError,
117 1
            requests.exceptions.ConnectTimeout,
118 1
            requests.exceptions.ReadTimeout,
119
        ):
120 1
            self.logger.log(logging.ERROR, self._error_message_timeout)
121
            self.error_out(408, self._error_message_timeout)
122 1
            return
123 1
124 1
        if result is not None:
125
            # print('result', type(result))
126 1
            self.upload_arrow_data(result, "output.csv")
127 1
128 1
            # if isinstance(result, pandas.DataFrame):
129 1
            #     result = result.to_json()
130 1
            # self.write(simplejson.dumps(result, ignore_nan=True))
131 1
            self.write("null")
132 1
        else:
133 1
            self.write("null")
134
        self.finish()
135
136
    def get_arrow_data(self, filename):
137
        scheme = "grpc+tcp"
138
        host = "localhost"
139
        port = 5005
140
141
        connection_args = {}
142
        client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}", **connection_args)
143
        return arrow_client.get_flight_by_path(filename, client)
144 1
145
    def upload_arrow_data(self, data, filename):
146 1
        scheme = "grpc+tcp"
147
        host = "localhost"
148
        port = 5005
149
150 1
        connection_args = {}
151
        client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}", **connection_args)
152
        return arrow_client.upload_data(client, data, filename)
153
154
    @gen.coroutine
155
    def post(self):
156 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
157
            self.fail_with_auth_error()
158
            return
159
160 1
        self._add_CORS_header()
161 1
        try:
162
            yield self._post_impl()
163
        except Exception as e:
164
            err_msg = f"{e.__class__.__name__} : {str(e)}"
165
            if err_msg != "KeyError : 'response'":
166
                err_msg = format_exception(e, "POST /evaluate")
167
                self.error_out(500, "Error processing script", info=err_msg)
168
            else:
169
                self.error_out(
170
                    404,
171
                    "Error processing script",
172
                    info="The endpoint you're "
173
                    "trying to query did not respond. Please make sure the "
174
                    "endpoint exists and the correct set of arguments are "
175
                    "provided.",
176
                )
177
178
    @gen.coroutine
179
    def _call_subprocess(self, function_to_evaluate, arguments):
180
        restricted_tabpy = RestrictedTabPy(
181
            self.protocol, self.port, self.logger, self.eval_timeout, self.request.headers
182
        )
183
        # Exec does not run the function, so it does not block.
184
        exec(function_to_evaluate, globals())
185
186
        # 'noqa' comments below tell flake8 to ignore undefined _user_script
187
        # name - the name is actually defined with user script being wrapped
188
        # in _user_script function (constructed as a striong) and then executed
189
        # with exec() call above.
190
        future = self.executor.submit(_user_script,  # noqa: F821
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _user_script does not seem to be defined.
Loading history...
191
                                      restricted_tabpy,
192
                                      **arguments if arguments is not None else None)
193
194
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future)
195
        raise gen.Return(ret)
196