Passed
Push — master ( 53016e...38fde4 )
by Oleksandr
02:52
created

EvaluationPlaneHandler._post_impl()   C

Complexity

Conditions 9

Size

Total Lines 57
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 43
dl 0
loc 57
rs 6.5146
c 0
b 0
f 0
cc 9
nop 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from tabpy.tabpy_server.handlers import BaseHandler
2
import json
3
import simplejson
4
import logging
5
from tabpy.tabpy_server.common.util import format_exception
6
import requests
7
from tornado import gen
8
from datetime import timedelta
9
10
11
class RestrictedTabPy:
12
    def __init__(self, protocol, port, logger, timeout):
13
        self.protocol = protocol
14
        self.port = port
15
        self.logger = logger
16
        self.timeout = timeout
17
18
    def query(self, name, *args, **kwargs):
19
        url = f"{self.protocol}://localhost:{self.port}/query/{name}"
20
        self.logger.log(logging.DEBUG, f"Querying {url}...")
21
        internal_data = {"data": args or kwargs}
22
        data = json.dumps(internal_data)
23
        headers = {"content-type": "application/json"}
24
        response = requests.post(
25
            url=url, data=data, headers=headers, timeout=self.timeout, verify=False
26
        )
27
        return response.json()
28
29
30
class EvaluationPlaneHandler(BaseHandler):
31
    """
32
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
33
    """
34
35
    def initialize(self, executor, app):
36
        super(EvaluationPlaneHandler, self).initialize(app)
37
        self.executor = executor
38
        self._error_message_timeout = (
39
            f"User defined script timed out. "
40
            f"Timeout is set to {self.eval_timeout} s."
41
        )
42
43
    @gen.coroutine
44
    def _post_impl(self):
45
        body = json.loads(self.request.body.decode("utf-8"))
46
        self.logger.log(logging.DEBUG, f"Processing POST request '{body}'...")
47
        if "script" not in body:
48
            self.error_out(400, "Script is empty.")
49
            return
50
51
        # Transforming user script into a proper function.
52
        user_code = body["script"]
53
        arguments = None
54
        arguments_str = ""
55
        if "data" in body:
56
            arguments = body["data"]
57
58
        if arguments is not None:
59
            if not isinstance(arguments, dict):
60
                self.error_out(
61
                    400, "Script parameters need to be provided as a dictionary."
62
                )
63
                return
64
            args_in = sorted(arguments.keys())
65
            n = len(arguments)
66
            if sorted('_arg'+str(i+1) for i in range(n)) == args_in:
67
                arguments_str = ", " + ", ".join(args_in)
68
            else:
69
                self.error_out(
70
                    400,
71
                    "Variables names should follow "
72
                    "the format _arg1, _arg2, _argN",
73
                )
74
                return
75
76
        function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n"
77
        for u in user_code.splitlines():
78
            function_to_evaluate += " " + u + "\n"
79
80
        self.logger.log(
81
            logging.INFO, f"function to evaluate={function_to_evaluate}"
82
        )
83
84
        try:
85
            result = yield self._call_subprocess(function_to_evaluate, arguments)
86
        except (
87
            gen.TimeoutError,
88
            requests.exceptions.ConnectTimeout,
89
            requests.exceptions.ReadTimeout,
90
        ):
91
            self.logger.log(logging.ERROR, self._error_message_timeout)
92
            self.error_out(408, self._error_message_timeout)
93
            return
94
95
        if result is not None:
96
            self.write(simplejson.dumps(result, ignore_nan=True))
97
        else:
98
            self.write("null")
99
        self.finish()
100
101
    @gen.coroutine
102
    def post(self):
103
        if self.should_fail_with_not_authorized():
104
            self.fail_with_not_authorized()
105
            return
106
107
        self._add_CORS_header()
108
        try:
109
            yield self._post_impl()
110
        except Exception as e:
111
            err_msg = f"{e.__class__.__name__} : {str(e)}"
112
            if err_msg != "KeyError : 'response'":
113
                err_msg = format_exception(e, "POST /evaluate")
114
                self.error_out(500, "Error processing script", info=err_msg)
115
            else:
116
                self.error_out(
117
                    404,
118
                    "Error processing script",
119
                    info="The endpoint you're "
120
                    "trying to query did not respond. Please make sure the "
121
                    "endpoint exists and the correct set of arguments are "
122
                    "provided.",
123
                )
124
125
    @gen.coroutine
126
    def _call_subprocess(self, function_to_evaluate, arguments):
127
        restricted_tabpy = RestrictedTabPy(
128
            self.protocol, self.port, self.logger, self.eval_timeout
129
        )
130
        # Exec does not run the function, so it does not block.
131
        exec(function_to_evaluate, globals())
132
133
        # 'noqa' comments below tell flake8 to ignore undefined _user_script
134
        # name - the name is actually defined with user script being wrapped
135
        # in _user_script function (constructed as a striong) and then executed
136
        # with exec() call above.
137
        if arguments is None:
138
            future = self.executor.submit(_user_script,  # noqa: F821
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _user_script does not seem to be defined.
Loading history...
139
                                          restricted_tabpy)
140
        else:
141
            future = self.executor.submit(_user_script,  # noqa: F821
142
                                          restricted_tabpy, **arguments)
143
144
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future)
145
        raise gen.Return(ret)
146