Passed
Push — master ( 048f62...22adbf )
by Oleksandr
02:40
created

EvaluationPlaneHandler.initialize()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
from tabpy.tabpy_server.handlers import BaseHandler
2
import json
3
import simplejson
4
import logging
5
from tabpy.tabpy_server.common.util import format_exception
6
import requests
7
from tornado import gen
8
from datetime import timedelta
9
10
11
class RestrictedTabPy:
12
    def __init__(self, protocol, port, logger, timeout):
13
        self.protocol = protocol
14
        self.port = port
15
        self.logger = logger
16
        self.timeout = timeout
17
18
    def query(self, name, *args, **kwargs):
19
        url = f"{self.protocol}://localhost:{self.port}/query/{name}"
20
        self.logger.log(logging.DEBUG, f"Querying {url}...")
21
        internal_data = {"data": args or kwargs}
22
        data = json.dumps(internal_data)
23
        headers = {"content-type": "application/json"}
24
        response = requests.post(
25
            url=url, data=data, headers=headers, timeout=self.timeout, verify=False
26
        )
27
        return response.json()
28
29
30
class EvaluationPlaneHandler(BaseHandler):
31
    """
32
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
33
    """
34
35
    def initialize(self, executor, app):
36
        super(EvaluationPlaneHandler, self).initialize(app)
37
        self.executor = executor
38
        self._error_message_timeout = (
39
            f"User defined script timed out. "
40
            f"Timeout is set to {self.eval_timeout} s."
41
        )
42
43
    @gen.coroutine
44
    def post(self):
45
        if self.should_fail_with_not_authorized():
46
            self.fail_with_not_authorized()
47
            return
48
49
        self._add_CORS_header()
50
        try:
51
            body = json.loads(self.request.body.decode("utf-8"))
52
            if "script" not in body:
53
                self.error_out(400, "Script is empty.")
54
                return
55
56
            # Transforming user script into a proper function.
57
            user_code = body["script"]
58
            arguments = None
59
            arguments_str = ""
60
            if "data" in body:
61
                arguments = body["data"]
62
63
            if arguments is not None:
64
                if not isinstance(arguments, dict):
65
                    self.error_out(
66
                        400, "Script parameters need to be provided as a dictionary."
67
                    )
68
                    return
69
                args_in = sorted(arguments.keys())
70
                n = len(arguments)
71
                if sorted('_arg'+str(i+1) for i in range(n)) == args_in:
72
                    arguments_str = ", " + ", ".join(args_in)
73
                else:
74
                    self.error_out(
75
                        400,
76
                        "Variables names should follow "
77
                        "the format _arg1, _arg2, _argN",
78
                    )
79
                    return
80
81
            function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n"
82
            for u in user_code.splitlines():
83
                function_to_evaluate += " " + u + "\n"
84
85
            self.logger.log(
86
                logging.INFO, f"function to evaluate={function_to_evaluate}"
87
            )
88
89
            try:
90
                result = yield self._call_subprocess(function_to_evaluate, arguments)
91
            except (
92
                gen.TimeoutError,
93
                requests.exceptions.ConnectTimeout,
94
                requests.exceptions.ReadTimeout,
95
            ):
96
                self.logger.log(logging.ERROR, self._error_message_timeout)
97
                self.error_out(408, self._error_message_timeout)
98
                return
99
100
            self.write(simplejson.dumps(result, ignore_nan=True))
101
            self.finish()
102
103
        except Exception as e:
104
            err_msg = f"{e.__class__.__name__} : {str(e)}"
105
            if err_msg != "KeyError : 'response'":
106
                err_msg = format_exception(e, "POST /evaluate")
107
                self.error_out(500, "Error processing script", info=err_msg)
108
            else:
109
                self.error_out(
110
                    404,
111
                    "Error processing script",
112
                    info="The endpoint you're "
113
                    "trying to query did not respond. Please make sure the "
114
                    "endpoint exists and the correct set of arguments are "
115
                    "provided.",
116
                )
117
118
    @gen.coroutine
119
    def _call_subprocess(self, function_to_evaluate, arguments):
120
        restricted_tabpy = RestrictedTabPy(
121
            self.protocol, self.port, self.logger, self.eval_timeout
122
        )
123
        # Exec does not run the function, so it does not block.
124
        exec(function_to_evaluate, globals())
125
126
        if arguments is None:
127
            future = self.executor.submit(_user_script, restricted_tabpy)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _user_script does not seem to be defined.
Loading history...
128
        else:
129
            future = self.executor.submit(_user_script, restricted_tabpy, **arguments)
130
131
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future)
132
        raise gen.Return(ret)
133