1
|
1 |
|
import pandas |
2
|
1 |
|
import pyarrow |
3
|
1 |
|
import uuid |
4
|
|
|
|
5
|
1 |
|
from tabpy.tabpy_server.handlers import BaseHandler |
6
|
1 |
|
import json |
7
|
1 |
|
import simplejson |
8
|
1 |
|
import logging |
9
|
1 |
|
from tabpy.tabpy_server.common.util import format_exception |
10
|
1 |
|
import requests |
11
|
1 |
|
from tornado import gen |
12
|
1 |
|
from datetime import timedelta |
13
|
1 |
|
from tabpy.tabpy_server.handlers.util import AuthErrorStates |
14
|
|
|
|
15
|
1 |
|
class RestrictedTabPy: |
16
|
1 |
|
def __init__(self, protocol, port, logger, timeout, headers): |
17
|
1 |
|
self.protocol = protocol |
18
|
1 |
|
self.port = port |
19
|
1 |
|
self.logger = logger |
20
|
1 |
|
self.timeout = timeout |
21
|
1 |
|
self.headers = headers |
22
|
|
|
|
23
|
1 |
|
def query(self, name, *args, **kwargs): |
24
|
|
|
url = f"{self.protocol}://localhost:{self.port}/query/{name}" |
25
|
|
|
self.logger.log(logging.DEBUG, f"Querying {url}...") |
26
|
|
|
internal_data = {"data": args or kwargs} |
27
|
|
|
data = json.dumps(internal_data) |
28
|
|
|
headers = self.headers |
29
|
|
|
response = requests.post( |
30
|
|
|
url=url, data=data, headers=headers, timeout=self.timeout, verify=False |
31
|
|
|
) |
32
|
|
|
return response.json() |
33
|
|
|
|
34
|
|
|
|
35
|
1 |
|
class EvaluationPlaneDisabledHandler(BaseHandler): |
36
|
|
|
""" |
37
|
|
|
EvaluationPlaneDisabledHandler responds with error message when ad-hoc scripts have been disabled. |
38
|
|
|
""" |
39
|
|
|
|
40
|
1 |
|
def initialize(self, executor, app): |
41
|
1 |
|
super(EvaluationPlaneDisabledHandler, self).initialize(app) |
42
|
1 |
|
self.executor = executor |
43
|
|
|
|
44
|
1 |
|
@gen.coroutine |
45
|
|
|
def post(self): |
46
|
1 |
|
if self.should_fail_with_auth_error() != AuthErrorStates.NONE: |
47
|
1 |
|
self.fail_with_auth_error() |
48
|
1 |
|
return |
49
|
1 |
|
self.error_out(404, "Ad-hoc scripts have been disabled on this analytics extension, please contact your " |
50
|
|
|
"administrator.") |
51
|
|
|
|
52
|
|
|
|
53
|
1 |
|
class EvaluationPlaneHandler(BaseHandler): |
54
|
|
|
""" |
55
|
|
|
EvaluationPlaneHandler is responsible for running arbitrary python scripts. |
56
|
|
|
""" |
57
|
|
|
|
58
|
1 |
|
def initialize(self, executor, app): |
59
|
1 |
|
super(EvaluationPlaneHandler, self).initialize(app) |
60
|
1 |
|
self.arrow_server = app.arrow_server |
61
|
1 |
|
self.executor = executor |
62
|
1 |
|
self._error_message_timeout = ( |
63
|
|
|
f"User defined script timed out. " |
64
|
|
|
f"Timeout is set to {self.eval_timeout} s." |
65
|
|
|
) |
66
|
|
|
|
67
|
1 |
|
@gen.coroutine |
68
|
|
|
def _post_impl(self): |
69
|
1 |
|
body = json.loads(self.request.body.decode("utf-8")) |
70
|
1 |
|
self.logger.log(logging.DEBUG, f"Processing POST request...") |
71
|
1 |
|
if "script" not in body: |
72
|
1 |
|
self.error_out(400, "Script is empty.") |
73
|
1 |
|
return |
74
|
|
|
|
75
|
|
|
# Transforming user script into a proper function. |
76
|
1 |
|
user_code = body["script"] |
77
|
1 |
|
arguments = None |
78
|
1 |
|
arguments_str = "" |
79
|
1 |
|
if self.arrow_server is not None and "dataPath" in body: |
80
|
|
|
# arrow flight scenario |
81
|
|
|
print("arrow flight scenario") |
82
|
|
|
arrow_data = self.get_arrow_data(body["dataPath"]) |
83
|
|
|
if arrow_data is not None: |
84
|
|
|
arguments = {"_arg1": arrow_data} |
85
|
1 |
|
elif "data" in body: |
86
|
|
|
# backwarding |
87
|
1 |
|
arguments = body["data"] |
88
|
|
|
|
89
|
1 |
|
if arguments is not None: |
90
|
1 |
|
if not isinstance(arguments, dict): |
91
|
|
|
self.error_out( |
92
|
|
|
400, "Script parameters need to be provided as a dictionary." |
93
|
|
|
) |
94
|
|
|
return |
95
|
1 |
|
args_in = sorted(arguments.keys()) |
96
|
1 |
|
n = len(arguments) |
97
|
1 |
|
if sorted('_arg'+str(i+1) for i in range(n)) == args_in: |
98
|
1 |
|
arguments_str = ", " + ", ".join(args_in) |
99
|
|
|
else: |
100
|
1 |
|
self.error_out( |
101
|
|
|
400, |
102
|
|
|
"Variables names should follow " |
103
|
|
|
"the format _arg1, _arg2, _argN", |
104
|
|
|
) |
105
|
1 |
|
return |
106
|
1 |
|
function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n" |
107
|
1 |
|
for u in user_code.splitlines(): |
108
|
1 |
|
function_to_evaluate += " " + u + "\n" |
109
|
|
|
|
110
|
1 |
|
self.logger.log( |
111
|
|
|
logging.INFO, f"function to evaluate={function_to_evaluate}" |
112
|
|
|
) |
113
|
|
|
|
114
|
1 |
|
try: |
115
|
1 |
|
result = yield self._call_subprocess(function_to_evaluate, arguments) |
116
|
1 |
|
except ( |
117
|
|
|
gen.TimeoutError, |
118
|
|
|
requests.exceptions.ConnectTimeout, |
119
|
|
|
requests.exceptions.ReadTimeout, |
120
|
|
|
): |
121
|
|
|
self.logger.log(logging.ERROR, self._error_message_timeout) |
122
|
|
|
self.error_out(408, self._error_message_timeout) |
123
|
|
|
return |
124
|
|
|
|
125
|
1 |
|
if result is not None: |
126
|
1 |
|
if self.arrow_server is not None and "dataPath" in body: |
127
|
|
|
# arrow flight scenario |
128
|
|
|
output_data_id = str(uuid.uuid4()) |
129
|
|
|
self.upload_arrow_data(result, output_data_id, { |
130
|
|
|
'removeOnDelete': 'True', |
131
|
|
|
'linkedIDs': body["dataPath"] |
132
|
|
|
}) |
133
|
|
|
result = { 'outputDataPath': output_data_id } |
134
|
|
|
self.logger.log(logging.WARN, f'outputDataPath={output_data_id}') |
135
|
|
|
else: |
136
|
1 |
|
if isinstance(result, pandas.DataFrame): |
137
|
|
|
result = result.to_dict(orient='list') |
138
|
1 |
|
self.write(simplejson.dumps(result, ignore_nan=True)) |
139
|
|
|
else: |
140
|
1 |
|
self.write("null") |
141
|
1 |
|
self.finish() |
142
|
|
|
|
143
|
1 |
|
def get_arrow_data(self, filename): |
144
|
|
|
descriptor = pyarrow.flight.FlightDescriptor.for_path(filename) |
145
|
|
|
info = self.arrow_server.get_flight_info(None, descriptor) |
146
|
|
|
for endpoint in info.endpoints: |
147
|
|
|
print('Ticket:', endpoint.ticket) |
148
|
|
|
for location in endpoint.locations: |
149
|
|
|
print(location) |
150
|
|
|
key = (descriptor.descriptor_type.value, descriptor.command, |
151
|
|
|
tuple(descriptor.path or tuple())) |
152
|
|
|
df = self.arrow_server.flights.pop(key).to_pandas() |
153
|
|
|
return df |
154
|
|
|
print('no data found for get') |
155
|
|
|
return '' |
156
|
|
|
|
157
|
1 |
|
def upload_arrow_data(self, data, filename, metadata): |
158
|
|
|
my_table = pyarrow.table(data) |
159
|
|
|
if metadata is not None: |
160
|
|
|
my_table.schema.with_metadata(metadata) |
161
|
|
|
print('Table rows=', str(len(my_table))) |
162
|
|
|
print("Uploading", data.head()) |
163
|
|
|
descriptor = pyarrow.flight.FlightDescriptor.for_path(filename) |
164
|
|
|
key = (descriptor.descriptor_type.value, descriptor.command, |
165
|
|
|
tuple(descriptor.path or tuple())) |
166
|
|
|
self.arrow_server.flights[key] = my_table |
167
|
|
|
|
168
|
1 |
|
@gen.coroutine |
169
|
|
|
def post(self): |
170
|
1 |
|
if self.should_fail_with_auth_error() != AuthErrorStates.NONE: |
171
|
1 |
|
self.fail_with_auth_error() |
172
|
1 |
|
return |
173
|
|
|
|
174
|
1 |
|
self._add_CORS_header() |
175
|
1 |
|
try: |
176
|
1 |
|
yield self._post_impl() |
177
|
1 |
|
except Exception as e: |
178
|
1 |
|
import traceback |
179
|
1 |
|
print(traceback.format_exc()) |
180
|
1 |
|
err_msg = f"{e.__class__.__name__} : {str(e)}" |
181
|
1 |
|
if err_msg != "KeyError : 'response'": |
182
|
1 |
|
err_msg = format_exception(e, "POST /evaluate") |
183
|
1 |
|
self.error_out(500, "Error processing script", info=err_msg) |
184
|
|
|
else: |
185
|
|
|
self.error_out( |
186
|
|
|
404, |
187
|
|
|
"Error processing script", |
188
|
|
|
info="The endpoint you're " |
189
|
|
|
"trying to query did not respond. Please make sure the " |
190
|
|
|
"endpoint exists and the correct set of arguments are " |
191
|
|
|
"provided.", |
192
|
|
|
) |
193
|
|
|
|
194
|
1 |
|
@gen.coroutine |
195
|
|
|
def _call_subprocess(self, function_to_evaluate, arguments): |
196
|
1 |
|
restricted_tabpy = RestrictedTabPy( |
197
|
|
|
self.protocol, self.port, self.logger, self.eval_timeout, self.request.headers |
198
|
|
|
) |
199
|
|
|
# Exec does not run the function, so it does not block. |
200
|
1 |
|
exec(function_to_evaluate, globals()) |
201
|
|
|
|
202
|
|
|
# 'noqa' comments below tell flake8 to ignore undefined _user_script |
203
|
|
|
# name - the name is actually defined with user script being wrapped |
204
|
|
|
# in _user_script function (constructed as a striong) and then executed |
205
|
|
|
# with exec() call above. |
206
|
1 |
|
future = self.executor.submit(_user_script, # noqa: F821 |
|
|
|
|
207
|
|
|
restricted_tabpy, |
208
|
|
|
**arguments if arguments is not None else None) |
209
|
|
|
|
210
|
1 |
|
ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future) |
211
|
|
|
raise gen.Return(ret) |
212
|
|
|
|