Passed
Pull Request — master (#595)
by
unknown
20:22 queued 07:08
created

EvaluationPlaneHandler._post_impl()   D

Complexity

Conditions 13

Size

Total Lines 82
Code Lines 58

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 16.356

Importance

Changes 0
Metric Value
eloc 58
dl 0
loc 82
ccs 35
cts 48
cp 0.7292
rs 4.2
c 0
b 0
f 0
cc 13
nop 1
crap 16.356

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like tabpy.tabpy_server.handlers.evaluation_plane_handler.EvaluationPlaneHandler._post_impl() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import pandas
2 1
import pyarrow
3 1
import uuid
4
5 1
from tabpy.tabpy_server.handlers import BaseHandler, arrow_client
6 1
import json
7 1
import simplejson
8 1
import logging
9 1
from tabpy.tabpy_server.common.util import format_exception
10 1
import requests
11 1
from tornado import gen
12 1
from datetime import timedelta
13 1
from tabpy.tabpy_server.handlers.util import AuthErrorStates
14
15 1
class RestrictedTabPy:
16 1
    def __init__(self, protocol, port, logger, timeout, headers):
17 1
        self.protocol = protocol
18 1
        self.port = port
19 1
        self.logger = logger
20 1
        self.timeout = timeout
21 1
        self.headers = headers
22
23 1
    def query(self, name, *args, **kwargs):
24
        url = f"{self.protocol}://localhost:{self.port}/query/{name}"
25
        self.logger.log(logging.DEBUG, f"Querying {url}...")
26
        internal_data = {"data": args or kwargs}
27
        data = json.dumps(internal_data)
28
        headers = self.headers
29
        response = requests.post(
30
            url=url, data=data, headers=headers, timeout=self.timeout, verify=False
31
        )
32
        return response.json()
33
34
35 1
class EvaluationPlaneDisabledHandler(BaseHandler):
36
    """
37
    EvaluationPlaneDisabledHandler responds with error message when ad-hoc scripts have been disabled.
38
    """
39
40 1
    def initialize(self, executor, app):
41 1
        super(EvaluationPlaneDisabledHandler, self).initialize(app)
42 1
        self.executor = executor
43
44 1
    @gen.coroutine
45
    def post(self):
46 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
47 1
            self.fail_with_auth_error()
48 1
            return
49 1
        self.error_out(404, "Ad-hoc scripts have been disabled on this analytics extension, please contact your "
50
                            "administrator.")
51
52
53 1
class EvaluationPlaneHandler(BaseHandler):
54
    """
55
    EvaluationPlaneHandler is responsible for running arbitrary python scripts.
56
    """
57
58 1
    def initialize(self, executor, app):
59 1
        super(EvaluationPlaneHandler, self).initialize(app)
60 1
        self.executor = executor
61 1
        self._error_message_timeout = (
62
            f"User defined script timed out. "
63
            f"Timeout is set to {self.eval_timeout} s."
64
        )
65
66 1
    @gen.coroutine
67
    def _post_impl(self):
68 1
        body = json.loads(self.request.body.decode("utf-8"))
69 1
        self.logger.log(logging.DEBUG, f"Processing POST request '{body}'...")
70 1
        if "script" not in body:
71 1
            self.error_out(400, "Script is empty.")
72 1
            return
73
74
        # Transforming user script into a proper function.
75 1
        user_code = body["script"]
76 1
        arguments = None
77 1
        arguments_str = ""
78 1
        if "dataPath" in body:
79
            # arrow flight scenario
80
            arrow_data = self.get_arrow_data(body["dataPath"])
81
            if arrow_data is not None:
82
                arguments = {"_arg1": arrow_data}
83 1
        elif "data" in body:
84
            # backwarding
85 1
            arguments = body["data"]
86
87 1
        if arguments is not None:
88 1
            if not isinstance(arguments, dict):
89
                self.error_out(
90
                    400, "Script parameters need to be provided as a dictionary."
91
                )
92
                return
93 1
            args_in = sorted(arguments.keys())
94 1
            n = len(arguments)
95 1
            if sorted('_arg'+str(i+1) for i in range(n)) == args_in:
96 1
                arguments_str = ", " + ", ".join(args_in)
97
            else:
98 1
                self.error_out(
99
                    400,
100
                    "Variables names should follow "
101
                    "the format _arg1, _arg2, _argN",
102
                )
103 1
                return
104 1
        function_to_evaluate = f"def _user_script(tabpy{arguments_str}):\n"
105 1
        for u in user_code.splitlines():
106 1
            function_to_evaluate += " " + u + "\n"
107
108 1
        self.logger.log(
109
            logging.INFO, f"function to evaluate={function_to_evaluate}"
110
        )
111
112 1
        print(f"function to evaluate={function_to_evaluate}")
113
        # arrow_data = self.get_arrow_data('input.csv')
114
        # arguments['_arg1'] = arrow_data
115
116 1
        print(f"arguments={arguments}")
117
        # print(f"input arrow data={arrow_data}")
118
119 1
        try:
120 1
            result = yield self._call_subprocess(function_to_evaluate, arguments)
121
            # result = yield self._call_subprocess(function_to_evaluate, arrowData)
122 1
        except (
123
            gen.TimeoutError,
124
            requests.exceptions.ConnectTimeout,
125
            requests.exceptions.ReadTimeout,
126
        ):
127
            self.logger.log(logging.ERROR, self._error_message_timeout)
128
            self.error_out(408, self._error_message_timeout)
129
            return
130
131 1
        if result is not None:
132 1
            if "dataPath" in body:
133
                # arrow flight scenario
134
                output_data_id = str(uuid.uuid4())
135
                self.upload_arrow_data(result, output_data_id, {
136
                    'removeOnDelete': 'True',
137
                    'linkedIDs': body["dataPath"]
138
                })
139
                result = { 'outputDataPath': output_data_id }
140
                self.logger.log(logging.WARN, f'outputDataPath={output_data_id}')
141
            else:
142 1
                if isinstance(result, pandas.DataFrame):
143
                    result = result.to_dict(orient='list')
144 1
            self.write(simplejson.dumps(result, ignore_nan=True))
145
        else:
146 1
            self.write("null")
147 1
        self.finish()
148
149 1
    def get_arrow_data(self, filename):
150
        scheme = "grpc+tcp"
151
        host = "localhost"
152
        port = 5005
153
154
        connection_args = {}
155
        client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}", **connection_args)
156
        return arrow_client.get_flight_by_path(filename, client)
157
158 1
    def upload_arrow_data(self, data, filename, metadata):
159
        scheme = "grpc+tcp"
160
        host = "localhost"
161
        port = 5005
162
163
        connection_args = {}
164
        client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}", **connection_args)
165
        return arrow_client.upload_data(client, data, filename, metadata)
166
167 1
    @gen.coroutine
168
    def post(self):
169 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
170 1
            self.fail_with_auth_error()
171 1
            return
172
173 1
        self._add_CORS_header()
174 1
        try:
175 1
            yield self._post_impl()
176 1
        except Exception as e:
177 1
            import traceback
178 1
            print(traceback.format_exc())
179 1
            err_msg = f"{e.__class__.__name__} : {str(e)}"
180 1
            if err_msg != "KeyError : 'response'":
181 1
                err_msg = format_exception(e, "POST /evaluate")
182 1
                self.error_out(500, "Error processing script", info=err_msg)
183
            else:
184
                self.error_out(
185
                    404,
186
                    "Error processing script",
187
                    info="The endpoint you're "
188
                    "trying to query did not respond. Please make sure the "
189
                    "endpoint exists and the correct set of arguments are "
190
                    "provided.",
191
                )
192
193 1
    @gen.coroutine
194
    def _call_subprocess(self, function_to_evaluate, arguments):
195 1
        restricted_tabpy = RestrictedTabPy(
196
            self.protocol, self.port, self.logger, self.eval_timeout, self.request.headers
197
        )
198
        # Exec does not run the function, so it does not block.
199 1
        exec(function_to_evaluate, globals())
200
201
        # 'noqa' comments below tell flake8 to ignore undefined _user_script
202
        # name - the name is actually defined with user script being wrapped
203
        # in _user_script function (constructed as a striong) and then executed
204
        # with exec() call above.
205 1
        future = self.executor.submit(_user_script,  # noqa: F821
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _user_script does not seem to be defined.
Loading history...
206
                                      restricted_tabpy,
207
                                      **arguments if arguments is not None else None)
208
209 1
        ret = yield gen.with_timeout(timedelta(seconds=self.eval_timeout), future)
210
        raise gen.Return(ret)
211