Test Failed
Push — master ( e8b070...b526e1 )
by Oleksandr
33:34 queued 26:33
created

handlers.evaluation_plane_handler   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 143
Duplicated Lines 0 %

Test Coverage

Coverage 83.95%

Importance

Changes 0
Metric Value
wmc 18
eloc 100
dl 0
loc 143
ccs 68
cts 81
cp 0.8395
rs 10
c 0
b 0
f 0
1 1
from tabpy.tabpy_server.handlers import BaseHandler
2 1
import json
3 1
import simplejson
4 1
import logging
5 1
from tabpy.tabpy_server.common.util import format_exception
6 1
import requests
7 1
from tornado import gen
8 1
from datetime import timedelta
9
from tabpy.tabpy_server.handlers.util import AuthErrorStates
10
11 1
12 1
class RestrictedTabPy:
13 1
    def __init__(self, protocol, port, logger, timeout):
14 1
        self.protocol = protocol
15 1
        self.port = port
16 1
        self.logger = logger
17
        self.timeout = timeout
18 1
19
    def query(self, name, *args, **kwargs):
20
        url = f"{self.protocol}://localhost:{self.port}/query/{name}"
21
        self.logger.log(logging.DEBUG, f"Querying {url}...")
22
        internal_data = {"data": args or kwargs}
23
        data = json.dumps(internal_data)
24
        headers = {"content-type": "application/json"}
25
        response = requests.post(
26
            url=url, data=data, headers=headers, timeout=self.timeout, verify=False
27
        )
28
        return response.json()
29
30 1
31
class EvaluationPlaneHandler(BaseHandler):
32
    """
33
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
34
    """
35 1
36 1
    def initialize(self, executor, app):
37 1
        super(EvaluationPlaneHandler, self).initialize(app)
38 1
        self.executor = executor
39
        self._error_message_timeout = (
40
            f"User defined script timed out. "
41
            f"Timeout is set to {self.eval_timeout} s."
42
        )
43 1
44
    @gen.coroutine
45 1
    def _post_impl(self):
46 1
        body = json.loads(self.request.body.decode("utf-8"))
47 1
        self.logger.log(logging.DEBUG, f"Processing POST request '{body}'...")
48 1
        if "script" not in body:
49 1
            self.error_out(400, "Script is empty.")
50
            return
51
52 1
        # Transforming user script into a proper function.
53 1
        user_code = body["script"]
54 1
        arguments = None
55 1
        arguments_str = ""
56 1
        if "data" in body:
57
            arguments = body["data"]
58 1
59 1
        if arguments is not None:
60
            if not isinstance(arguments, dict):
61
                self.error_out(
62
                    400, "Script parameters need to be provided as a dictionary."
63
                )
64 1
                return
65 1
            args_in = sorted(arguments.keys())
66 1
            n = len(arguments)
67 1
            if sorted('_arg'+str(i+1) for i in range(n)) == args_in:
68
                arguments_str = ", " + ", ".join(args_in)
69 1
            else:
70
                self.error_out(
71
                    400,
72
                    "Variables names should follow "
73
                    "the format _arg1, _arg2, _argN",
74 1
                )
75
                return
76 1
77 1
        function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n"
78 1
        for u in user_code.splitlines():
79
            function_to_evaluate += " " + u + "\n"
80 1
81
        self.logger.log(
82
            logging.INFO, f"function to evaluate={function_to_evaluate}"
83
        )
84 1
85 1
        try:
86 1
            result = yield self._call_subprocess(function_to_evaluate, arguments)
87
        except (
88
            gen.TimeoutError,
89
            requests.exceptions.ConnectTimeout,
90
            requests.exceptions.ReadTimeout,
91
        ):
92
            self.logger.log(logging.ERROR, self._error_message_timeout)
93
            self.error_out(408, self._error_message_timeout)
94
            return
95 1
96 1
        if result is not None:
97
            self.write(simplejson.dumps(result, ignore_nan=True))
98 1
        else:
99 1
            self.write("null")
100
        self.finish()
101 1
102
    @gen.coroutine
103 1
    def post(self):
104 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
105 1
            self.fail_with_auth_error()
106
            return
107 1
108 1
        self._add_CORS_header()
109 1
        try:
110 1
            yield self._post_impl()
111 1
        except Exception as e:
112 1
            err_msg = f"{e.__class__.__name__} : {str(e)}"
113 1
            if err_msg != "KeyError : 'response'":
114 1
                err_msg = format_exception(e, "POST /evaluate")
115
                self.error_out(500, "Error processing script", info=err_msg)
116
            else:
117
                self.error_out(
118
                    404,
119
                    "Error processing script",
120
                    info="The endpoint you're "
121
                    "trying to query did not respond. Please make sure the "
122
                    "endpoint exists and the correct set of arguments are "
123
                    "provided.",
124
                )
125 1
126
    @gen.coroutine
127 1
    def _call_subprocess(self, function_to_evaluate, arguments):
128
        restricted_tabpy = RestrictedTabPy(
129
            self.protocol, self.port, self.logger, self.eval_timeout
130
        )
131 1
        # Exec does not run the function, so it does not block.
132
        exec(function_to_evaluate, globals())
133
134
        # 'noqa' comments below tell flake8 to ignore undefined _user_script
135
        # name - the name is actually defined with user script being wrapped
136
        # in _user_script function (constructed as a striong) and then executed
137 1
        # with exec() call above.
138 1
        future = self.executor.submit(_user_script,  # noqa: F821
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _user_script does not seem to be defined.
Loading history...
139
                                      restricted_tabpy,
140
                                      **arguments if arguments is not None else None)
141 1
142
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future)
143
        raise gen.Return(ret)
144