1
|
1 |
|
from tabpy.tabpy_server.handlers import BaseHandler |
2
|
1 |
|
import json |
3
|
1 |
|
import simplejson |
4
|
1 |
|
import logging |
5
|
1 |
|
from tabpy.tabpy_server.common.util import format_exception |
6
|
1 |
|
import requests |
7
|
1 |
|
from tornado import gen |
8
|
1 |
|
from datetime import timedelta |
9
|
|
|
|
10
|
|
|
|
11
|
1 |
|
class RestrictedTabPy: |
12
|
1 |
|
def __init__(self, protocol, port, logger, timeout): |
13
|
1 |
|
self.protocol = protocol |
14
|
1 |
|
self.port = port |
15
|
1 |
|
self.logger = logger |
16
|
1 |
|
self.timeout = timeout |
17
|
|
|
|
18
|
1 |
|
def query(self, name, *args, **kwargs): |
19
|
|
|
url = f"{self.protocol}://localhost:{self.port}/query/{name}" |
20
|
|
|
self.logger.log(logging.DEBUG, f"Querying {url}...") |
21
|
|
|
internal_data = {"data": args or kwargs} |
22
|
|
|
data = json.dumps(internal_data) |
23
|
|
|
headers = {"content-type": "application/json"} |
24
|
|
|
response = requests.post( |
25
|
|
|
url=url, data=data, headers=headers, timeout=self.timeout, verify=False |
26
|
|
|
) |
27
|
|
|
return response.json() |
28
|
|
|
|
29
|
|
|
|
30
|
1 |
|
class EvaluationPlaneHandler(BaseHandler): |
31
|
|
|
""" |
32
|
|
|
EvaluationPlaneHandler is responsible for running arbitrary python scripts. |
33
|
|
|
""" |
34
|
|
|
|
35
|
1 |
|
def initialize(self, executor, app): |
36
|
1 |
|
super(EvaluationPlaneHandler, self).initialize(app) |
37
|
1 |
|
self.executor = executor |
38
|
1 |
|
self._error_message_timeout = ( |
39
|
|
|
f"User defined script timed out. " |
40
|
|
|
f"Timeout is set to {self.eval_timeout} s." |
41
|
|
|
) |
42
|
|
|
|
43
|
1 |
|
@gen.coroutine |
44
|
|
|
def _post_impl(self): |
45
|
1 |
|
body = json.loads(self.request.body.decode("utf-8")) |
46
|
1 |
|
self.logger.log(logging.DEBUG, f"Processing POST request '{body}'...") |
47
|
1 |
|
if "script" not in body: |
48
|
1 |
|
self.error_out(400, "Script is empty.") |
49
|
1 |
|
return |
50
|
|
|
|
51
|
|
|
# Transforming user script into a proper function. |
52
|
1 |
|
user_code = body["script"] |
53
|
1 |
|
arguments = None |
54
|
1 |
|
arguments_str = "" |
55
|
1 |
|
if "data" in body: |
56
|
1 |
|
arguments = body["data"] |
57
|
|
|
|
58
|
1 |
|
if arguments is not None: |
59
|
1 |
|
if not isinstance(arguments, dict): |
60
|
|
|
self.error_out( |
61
|
|
|
400, "Script parameters need to be provided as a dictionary." |
62
|
|
|
) |
63
|
|
|
return |
64
|
1 |
|
args_in = sorted(arguments.keys()) |
65
|
1 |
|
n = len(arguments) |
66
|
1 |
|
if sorted('_arg'+str(i+1) for i in range(n)) == args_in: |
67
|
1 |
|
arguments_str = ", " + ", ".join(args_in) |
68
|
|
|
else: |
69
|
1 |
|
self.error_out( |
70
|
|
|
400, |
71
|
|
|
"Variables names should follow " |
72
|
|
|
"the format _arg1, _arg2, _argN", |
73
|
|
|
) |
74
|
1 |
|
return |
75
|
|
|
|
76
|
1 |
|
function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n" |
77
|
1 |
|
for u in user_code.splitlines(): |
78
|
1 |
|
function_to_evaluate += " " + u + "\n" |
79
|
|
|
|
80
|
1 |
|
self.logger.log( |
81
|
|
|
logging.INFO, f"function to evaluate={function_to_evaluate}" |
82
|
|
|
) |
83
|
|
|
|
84
|
1 |
|
try: |
85
|
1 |
|
result = yield self._call_subprocess(function_to_evaluate, arguments) |
86
|
1 |
|
except ( |
87
|
|
|
gen.TimeoutError, |
88
|
|
|
requests.exceptions.ConnectTimeout, |
89
|
|
|
requests.exceptions.ReadTimeout, |
90
|
|
|
): |
91
|
|
|
self.logger.log(logging.ERROR, self._error_message_timeout) |
92
|
|
|
self.error_out(408, self._error_message_timeout) |
93
|
|
|
return |
94
|
|
|
|
95
|
1 |
|
if result is not None: |
96
|
1 |
|
self.write(simplejson.dumps(result, ignore_nan=True)) |
97
|
|
|
else: |
98
|
1 |
|
self.write("null") |
99
|
1 |
|
self.finish() |
100
|
|
|
|
101
|
1 |
|
@gen.coroutine |
102
|
|
|
def post(self): |
103
|
1 |
|
if self.should_fail_with_not_authorized(): |
104
|
1 |
|
self.fail_with_not_authorized() |
105
|
1 |
|
return |
106
|
|
|
|
107
|
1 |
|
self._add_CORS_header() |
108
|
1 |
|
try: |
109
|
1 |
|
yield self._post_impl() |
110
|
1 |
|
except Exception as e: |
111
|
1 |
|
err_msg = f"{e.__class__.__name__} : {str(e)}" |
112
|
1 |
|
if err_msg != "KeyError : 'response'": |
113
|
1 |
|
err_msg = format_exception(e, "POST /evaluate") |
114
|
1 |
|
self.error_out(500, "Error processing script", info=err_msg) |
115
|
|
|
else: |
116
|
|
|
self.error_out( |
117
|
|
|
404, |
118
|
|
|
"Error processing script", |
119
|
|
|
info="The endpoint you're " |
120
|
|
|
"trying to query did not respond. Please make sure the " |
121
|
|
|
"endpoint exists and the correct set of arguments are " |
122
|
|
|
"provided.", |
123
|
|
|
) |
124
|
|
|
|
125
|
1 |
|
@gen.coroutine |
126
|
|
|
def _call_subprocess(self, function_to_evaluate, arguments): |
127
|
1 |
|
restricted_tabpy = RestrictedTabPy( |
128
|
|
|
self.protocol, self.port, self.logger, self.eval_timeout |
129
|
|
|
) |
130
|
|
|
# Exec does not run the function, so it does not block. |
131
|
1 |
|
exec(function_to_evaluate, globals()) |
132
|
|
|
|
133
|
|
|
# 'noqa' comments below tell flake8 to ignore undefined _user_script |
134
|
|
|
# name - the name is actually defined with user script being wrapped |
135
|
|
|
# in _user_script function (constructed as a striong) and then executed |
136
|
|
|
# with exec() call above. |
137
|
1 |
|
future = self.executor.submit(_user_script, # noqa: F821 |
|
|
|
|
138
|
1 |
|
restricted_tabpy, |
139
|
|
|
**arguments if arguments is not None else None) |
140
|
|
|
|
141
|
1 |
|
ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future) |
142
|
|
|
raise gen.Return(ret) |
143
|
|
|
|