1
|
|
|
from tabpy_server.handlers import BaseHandler |
2
|
|
|
import json |
3
|
|
|
import logging |
4
|
|
|
from tabpy_server.common.util import format_exception |
5
|
|
|
import requests |
6
|
|
|
from tornado import gen |
7
|
|
|
from datetime import timedelta |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
class RestrictedTabPy: |
11
|
|
|
def __init__(self, protocol, port, logger, timeout): |
12
|
|
|
self.protocol = protocol |
13
|
|
|
self.port = port |
14
|
|
|
self.logger = logger |
15
|
|
|
self.timeout = timeout |
16
|
|
|
|
17
|
|
|
def query(self, name, *args, **kwargs): |
18
|
|
|
url = f'{self.protocol}://localhost:{self.port}/query/{name}' |
19
|
|
|
self.logger.log(logging.DEBUG, f'Querying {url}...') |
20
|
|
|
internal_data = {'data': args or kwargs} |
21
|
|
|
data = json.dumps(internal_data) |
22
|
|
|
headers = {'content-type': 'application/json'} |
23
|
|
|
response = requests.post(url=url, data=data, headers=headers, |
24
|
|
|
timeout=self.timeout, |
25
|
|
|
verify=False) |
26
|
|
|
return response.json() |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
class EvaluationPlaneHandler(BaseHandler): |
30
|
|
|
''' |
31
|
|
|
EvaluationPlaneHandler is responsible for running arbitrary python scripts. |
32
|
|
|
''' |
33
|
|
|
|
34
|
|
|
def initialize(self, executor, app): |
35
|
|
|
super(EvaluationPlaneHandler, self).initialize(app) |
36
|
|
|
self.executor = executor |
37
|
|
|
self._error_message_timeout = f'User defined script timed out. ' \ |
38
|
|
|
f'Timeout is set to {self.eval_timeout} s.' |
39
|
|
|
|
40
|
|
|
@gen.coroutine |
41
|
|
|
def post(self): |
42
|
|
|
if self.should_fail_with_not_authorized(): |
43
|
|
|
self.fail_with_not_authorized() |
44
|
|
|
return |
45
|
|
|
|
46
|
|
|
self._add_CORS_header() |
47
|
|
|
try: |
48
|
|
|
body = json.loads(self.request.body.decode('utf-8')) |
49
|
|
|
if 'script' not in body: |
50
|
|
|
self.error_out(400, 'Script is empty.') |
51
|
|
|
return |
52
|
|
|
|
53
|
|
|
# Transforming user script into a proper function. |
54
|
|
|
user_code = body['script'] |
55
|
|
|
arguments = None |
56
|
|
|
arguments_str = '' |
57
|
|
|
if 'data' in body: |
58
|
|
|
arguments = body['data'] |
59
|
|
|
|
60
|
|
|
if arguments is not None: |
61
|
|
|
if not isinstance(arguments, dict): |
62
|
|
|
self.error_out(400, 'Script parameters need to be ' |
63
|
|
|
'provided as a dictionary.') |
64
|
|
|
return |
65
|
|
|
else: |
66
|
|
|
arguments_expected = [] |
67
|
|
|
for i in range(1, len(arguments.keys()) + 1): |
68
|
|
|
arguments_expected.append('_arg' + str(i)) |
69
|
|
|
if sorted(arguments_expected) == sorted(arguments.keys()): |
70
|
|
|
arguments_str = ', ' + ', '.join(arguments.keys()) |
71
|
|
|
else: |
72
|
|
|
self.error_out(400, 'Variables names should follow ' |
73
|
|
|
'the format _arg1, _arg2, _argN') |
74
|
|
|
return |
75
|
|
|
|
76
|
|
|
function_to_evaluate = f'def _user_script(tabpy{arguments_str}):\n' |
77
|
|
|
for u in user_code.splitlines(): |
78
|
|
|
function_to_evaluate += ' ' + u + '\n' |
79
|
|
|
|
80
|
|
|
self.logger.log( |
81
|
|
|
logging.INFO, |
82
|
|
|
f'function to evaluate={function_to_evaluate}') |
83
|
|
|
|
84
|
|
|
try: |
85
|
|
|
result = yield self._call_subprocess(function_to_evaluate, |
86
|
|
|
arguments) |
87
|
|
|
except (gen.TimeoutError, |
88
|
|
|
requests.exceptions.ConnectTimeout, |
89
|
|
|
requests.exceptions.ReadTimeout): |
90
|
|
|
self.logger.log(logging.ERROR, self._error_message_timeout) |
91
|
|
|
self.error_out(408, self._error_message_timeout) |
92
|
|
|
return |
93
|
|
|
|
94
|
|
|
if result is None: |
95
|
|
|
self.error_out(400, 'Error running script. No return value') |
96
|
|
|
else: |
97
|
|
|
self.write(json.dumps(result)) |
98
|
|
|
self.finish() |
99
|
|
|
|
100
|
|
|
except Exception as e: |
101
|
|
|
err_msg = f'{e.__class__.__name__} : {str(e)}' |
102
|
|
|
if err_msg != "KeyError : 'response'": |
103
|
|
|
err_msg = format_exception(e, 'POST /evaluate') |
104
|
|
|
self.error_out(500, 'Error processing script', info=err_msg) |
105
|
|
|
else: |
106
|
|
|
self.error_out( |
107
|
|
|
404, |
108
|
|
|
'Error processing script', |
109
|
|
|
info="The endpoint you're " |
110
|
|
|
"trying to query did not respond. Please make sure the " |
111
|
|
|
"endpoint exists and the correct set of arguments are " |
112
|
|
|
"provided.") |
113
|
|
|
|
114
|
|
|
@gen.coroutine |
115
|
|
|
def _call_subprocess(self, function_to_evaluate, arguments): |
116
|
|
|
restricted_tabpy = RestrictedTabPy( |
117
|
|
|
self.protocol, |
118
|
|
|
self.port, |
119
|
|
|
self.logger, |
120
|
|
|
self.eval_timeout) |
121
|
|
|
# Exec does not run the function, so it does not block. |
122
|
|
|
exec(function_to_evaluate, globals()) |
123
|
|
|
|
124
|
|
|
if arguments is None: |
125
|
|
|
future = self.executor.submit(_user_script, restricted_tabpy) |
|
|
|
|
126
|
|
|
else: |
127
|
|
|
future = self.executor.submit(_user_script, restricted_tabpy, |
128
|
|
|
**arguments) |
129
|
|
|
|
130
|
|
|
ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), |
131
|
|
|
future) |
132
|
|
|
raise gen.Return(ret) |
133
|
|
|
|