1
|
1 |
|
from tabpy.tabpy_server.handlers import BaseHandler |
2
|
1 |
|
import json |
3
|
1 |
|
import simplejson |
4
|
1 |
|
import logging |
5
|
1 |
|
from tabpy.tabpy_server.common.util import format_exception |
6
|
1 |
|
import requests |
7
|
1 |
|
from tornado import gen |
8
|
1 |
|
from datetime import timedelta |
9
|
1 |
|
from tabpy.tabpy_server.handlers.util import AuthErrorStates |
10
|
|
|
|
11
|
|
|
|
12
|
1 |
|
class RestrictedTabPy: |
13
|
1 |
|
def __init__(self, protocol, port, logger, timeout, headers): |
14
|
1 |
|
self.protocol = protocol |
15
|
1 |
|
self.port = port |
16
|
1 |
|
self.logger = logger |
17
|
1 |
|
self.timeout = timeout |
18
|
1 |
|
self.headers = headers |
19
|
|
|
|
20
|
1 |
|
def query(self, name, *args, **kwargs): |
21
|
|
|
url = f"{self.protocol}://localhost:{self.port}/query/{name}" |
22
|
|
|
self.logger.log(logging.DEBUG, f"Querying {url}...") |
23
|
|
|
internal_data = {"data": args or kwargs} |
24
|
|
|
data = json.dumps(internal_data) |
25
|
|
|
headers = self.headers |
26
|
|
|
response = requests.post( |
27
|
|
|
url=url, data=data, headers=headers, timeout=self.timeout, verify=False |
28
|
|
|
) |
29
|
|
|
return response.json() |
30
|
|
|
|
31
|
|
|
|
32
|
1 |
|
class EvaluationPlaneDisabledHandler(BaseHandler): |
33
|
|
|
""" |
34
|
|
|
EvaluationPlaneDisabledHandler responds with error message when ad-hoc scripts have been disabled. |
35
|
|
|
""" |
36
|
|
|
|
37
|
1 |
|
def initialize(self, executor, app): |
38
|
1 |
|
super(EvaluationPlaneDisabledHandler, self).initialize(app) |
39
|
1 |
|
self.executor = executor |
40
|
|
|
|
41
|
1 |
|
@gen.coroutine |
42
|
|
|
def post(self): |
43
|
1 |
|
self.error_out(404, "Ad-hoc scripts have been disabled on this analytics extension, please contact your " |
44
|
|
|
"administrator.") |
45
|
|
|
|
46
|
|
|
|
47
|
1 |
|
class EvaluationPlaneHandler(BaseHandler): |
48
|
|
|
""" |
49
|
|
|
EvaluationPlaneHandler is responsible for running arbitrary python scripts. |
50
|
|
|
""" |
51
|
|
|
|
52
|
1 |
|
def initialize(self, executor, app): |
53
|
1 |
|
super(EvaluationPlaneHandler, self).initialize(app) |
54
|
1 |
|
self.executor = executor |
55
|
1 |
|
self._error_message_timeout = ( |
56
|
|
|
f"User defined script timed out. " |
57
|
|
|
f"Timeout is set to {self.eval_timeout} s." |
58
|
|
|
) |
59
|
|
|
|
60
|
1 |
|
@gen.coroutine |
61
|
|
|
def _post_impl(self): |
62
|
1 |
|
body = json.loads(self.request.body.decode("utf-8")) |
63
|
1 |
|
self.logger.log(logging.DEBUG, f"Processing POST request '{body}'...") |
64
|
1 |
|
if "script" not in body: |
65
|
1 |
|
self.error_out(400, "Script is empty.") |
66
|
1 |
|
return |
67
|
|
|
|
68
|
|
|
# Transforming user script into a proper function. |
69
|
1 |
|
user_code = body["script"] |
70
|
1 |
|
arguments = None |
71
|
1 |
|
arguments_str = "" |
72
|
1 |
|
if "data" in body: |
73
|
1 |
|
arguments = body["data"] |
74
|
|
|
|
75
|
1 |
|
if arguments is not None: |
76
|
1 |
|
if not isinstance(arguments, dict): |
77
|
|
|
self.error_out( |
78
|
|
|
400, "Script parameters need to be provided as a dictionary." |
79
|
|
|
) |
80
|
|
|
return |
81
|
1 |
|
args_in = sorted(arguments.keys()) |
82
|
1 |
|
n = len(arguments) |
83
|
1 |
|
if sorted('_arg'+str(i+1) for i in range(n)) == args_in: |
84
|
1 |
|
arguments_str = ", " + ", ".join(args_in) |
85
|
|
|
else: |
86
|
1 |
|
self.error_out( |
87
|
|
|
400, |
88
|
|
|
"Variables names should follow " |
89
|
|
|
"the format _arg1, _arg2, _argN", |
90
|
|
|
) |
91
|
1 |
|
return |
92
|
1 |
|
function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n" |
93
|
1 |
|
for u in user_code.splitlines(): |
94
|
1 |
|
function_to_evaluate += " " + u + "\n" |
95
|
|
|
|
96
|
1 |
|
self.logger.log( |
97
|
|
|
logging.INFO, f"function to evaluate={function_to_evaluate}" |
98
|
|
|
) |
99
|
|
|
|
100
|
1 |
|
try: |
101
|
1 |
|
result = yield self._call_subprocess(function_to_evaluate, arguments) |
102
|
1 |
|
except ( |
103
|
|
|
gen.TimeoutError, |
104
|
|
|
requests.exceptions.ConnectTimeout, |
105
|
|
|
requests.exceptions.ReadTimeout, |
106
|
|
|
): |
107
|
|
|
self.logger.log(logging.ERROR, self._error_message_timeout) |
108
|
|
|
self.error_out(408, self._error_message_timeout) |
109
|
|
|
return |
110
|
|
|
|
111
|
1 |
|
if result is not None: |
112
|
1 |
|
self.write(simplejson.dumps(result, ignore_nan=True)) |
113
|
|
|
else: |
114
|
1 |
|
self.write("null") |
115
|
1 |
|
self.finish() |
116
|
|
|
|
117
|
1 |
|
@gen.coroutine |
118
|
|
|
def post(self): |
119
|
1 |
|
if self.should_fail_with_auth_error() != AuthErrorStates.NONE: |
120
|
1 |
|
self.fail_with_auth_error() |
121
|
1 |
|
return |
122
|
|
|
|
123
|
1 |
|
self._add_CORS_header() |
124
|
1 |
|
try: |
125
|
1 |
|
yield self._post_impl() |
126
|
1 |
|
except Exception as e: |
127
|
1 |
|
err_msg = f"{e.__class__.__name__} : {str(e)}" |
128
|
1 |
|
if err_msg != "KeyError : 'response'": |
129
|
1 |
|
err_msg = format_exception(e, "POST /evaluate") |
130
|
1 |
|
self.error_out(500, "Error processing script", info=err_msg) |
131
|
|
|
else: |
132
|
|
|
self.error_out( |
133
|
|
|
404, |
134
|
|
|
"Error processing script", |
135
|
|
|
info="The endpoint you're " |
136
|
|
|
"trying to query did not respond. Please make sure the " |
137
|
|
|
"endpoint exists and the correct set of arguments are " |
138
|
|
|
"provided.", |
139
|
|
|
) |
140
|
|
|
|
141
|
1 |
|
@gen.coroutine |
142
|
|
|
def _call_subprocess(self, function_to_evaluate, arguments): |
143
|
1 |
|
restricted_tabpy = RestrictedTabPy( |
144
|
|
|
self.protocol, self.port, self.logger, self.eval_timeout, self.request.headers |
145
|
|
|
) |
146
|
|
|
# Exec does not run the function, so it does not block. |
147
|
1 |
|
exec(function_to_evaluate, globals()) |
148
|
|
|
|
149
|
|
|
# 'noqa' comments below tell flake8 to ignore undefined _user_script |
150
|
|
|
# name - the name is actually defined with user script being wrapped |
151
|
|
|
# in _user_script function (constructed as a striong) and then executed |
152
|
|
|
# with exec() call above. |
153
|
1 |
|
future = self.executor.submit(_user_script, # noqa: F821 |
|
|
|
|
154
|
|
|
restricted_tabpy, |
155
|
|
|
**arguments if arguments is not None else None) |
156
|
|
|
|
157
|
1 |
|
ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future) |
158
|
|
|
raise gen.Return(ret) |
159
|
|
|
|