Passed
Push — master ( d71f34...c856ab )
by Oleksandr
01:12
created

tabpy_server.handlers.evaluation_plane_handler   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 18
eloc 99
dl 0
loc 133
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A RestrictedTabPy.query() 0 10 1
A EvaluationPlaneHandler.initialize() 0 4 1
A RestrictedTabPy.__init__() 0 5 1
A EvaluationPlaneHandler._call_subprocess() 0 19 2
D EvaluationPlaneHandler.post() 0 70 13
1
from tabpy_server.handlers import BaseHandler
2
import json
3
import logging
4
from tabpy_server.common.util import format_exception
5
import requests
6
from tornado import gen
7
from datetime import timedelta
8
9
10
class RestrictedTabPy:
11
    def __init__(self, protocol, port, logger, timeout):
12
        self.protocol = protocol
13
        self.port = port
14
        self.logger = logger
15
        self.timeout = timeout
16
17
    def query(self, name, *args, **kwargs):
18
        url = f'{self.protocol}://localhost:{self.port}/query/{name}'
19
        self.logger.log(logging.DEBUG, f'Querying {url}...')
20
        internal_data = {'data': args or kwargs}
21
        data = json.dumps(internal_data)
22
        headers = {'content-type': 'application/json'}
23
        response = requests.post(url=url, data=data, headers=headers,
24
                                 timeout=self.timeout,
25
                                 verify=False)
26
        return response.json()
27
28
29
class EvaluationPlaneHandler(BaseHandler):
30
    '''
31
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
32
    '''
33
34
    def initialize(self, executor, app):
35
        super(EvaluationPlaneHandler, self).initialize(app)
36
        self.executor = executor
37
        self._error_message_timeout = f'User defined script timed out. ' \
38
            f'Timeout is set to {self.eval_timeout} s.'
39
40
    @gen.coroutine
41
    def post(self):
42
        if self.should_fail_with_not_authorized():
43
            self.fail_with_not_authorized()
44
            return
45
46
        self._add_CORS_header()
47
        try:
48
            body = json.loads(self.request.body.decode('utf-8'))
49
            if 'script' not in body:
50
                self.error_out(400, 'Script is empty.')
51
                return
52
53
            # Transforming user script into a proper function.
54
            user_code = body['script']
55
            arguments = None
56
            arguments_str = ''
57
            if 'data' in body:
58
                arguments = body['data']
59
60
            if arguments is not None:
61
                if not isinstance(arguments, dict):
62
                    self.error_out(400, 'Script parameters need to be '
63
                                        'provided as a dictionary.')
64
                    return
65
                else:
66
                    arguments_expected = []
67
                    for i in range(1, len(arguments.keys()) + 1):
68
                        arguments_expected.append('_arg' + str(i))
69
                    if sorted(arguments_expected) == sorted(arguments.keys()):
70
                        arguments_str = ', ' + ', '.join(arguments.keys())
71
                    else:
72
                        self.error_out(400, 'Variables names should follow '
73
                                            'the format _arg1, _arg2, _argN')
74
                        return
75
76
            function_to_evaluate = f'def _user_script(tabpy{arguments_str}):\n'
77
            for u in user_code.splitlines():
78
                function_to_evaluate += ' ' + u + '\n'
79
80
            self.logger.log(
81
                logging.INFO,
82
                f'function to evaluate={function_to_evaluate}')
83
84
            try:
85
                result = yield self._call_subprocess(function_to_evaluate,
86
                                                     arguments)
87
            except (gen.TimeoutError,
88
                    requests.exceptions.ConnectTimeout,
89
                    requests.exceptions.ReadTimeout):
90
                self.logger.log(logging.ERROR, self._error_message_timeout)
91
                self.error_out(408, self._error_message_timeout)
92
                return
93
94
            if result is None:
95
                self.error_out(400, 'Error running script. No return value')
96
            else:
97
                self.write(json.dumps(result))
98
                self.finish()
99
100
        except Exception as e:
101
            err_msg = f'{e.__class__.__name__} : {str(e)}'
102
            if err_msg != "KeyError : 'response'":
103
                err_msg = format_exception(e, 'POST /evaluate')
104
                self.error_out(500, 'Error processing script', info=err_msg)
105
            else:
106
                self.error_out(
107
                    404,
108
                    'Error processing script',
109
                    info="The endpoint you're "
110
                    "trying to query did not respond. Please make sure the "
111
                    "endpoint exists and the correct set of arguments are "
112
                    "provided.")
113
114
    @gen.coroutine
115
    def _call_subprocess(self, function_to_evaluate, arguments):
116
        restricted_tabpy = RestrictedTabPy(
117
            self.protocol,
118
            self.port,
119
            self.logger,
120
            self.eval_timeout)
121
        # Exec does not run the function, so it does not block.
122
        exec(function_to_evaluate, globals())
123
124
        if arguments is None:
125
            future = self.executor.submit(_user_script, restricted_tabpy)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _user_script does not seem to be defined.
Loading history...
126
        else:
127
            future = self.executor.submit(_user_script, restricted_tabpy,
128
                                          **arguments)
129
130
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout),
131
                                     future)
132
        raise gen.Return(ret)
133