Passed
Pull Request — master (#644)
by
unknown
21:04 queued 04:15
created

tabpy.tabpy_server.handlers.endpoint_handler   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 143
Duplicated Lines 0 %

Test Coverage

Coverage 30.21%

Importance

Changes 0
Metric Value
wmc 20
eloc 104
dl 0
loc 143
ccs 29
cts 96
cp 0.3021
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A EndpointHandler.initialize() 0 2 1
A EndpointHandler.get() 0 18 4
B EndpointHandler.put() 0 45 7
B EndpointHandler.delete() 0 45 7
A EndpointHandler._delete_po_future() 0 5 1
1
"""
2
HTTP handeler to serve specific endpoint request like
3
http://myserver:9004/endpoints/mymodel
4
5
For how generic endpoints requests is served look
6
at endpoints_handler.py
7
"""
8
9 1
import json
10 1
import logging
11 1
import shutil
12 1
from tabpy.tabpy_server.common.util import format_exception
13 1
from tabpy.tabpy_server.handlers import ManagementHandler
14 1
from tabpy.tabpy_server.handlers.base_handler import STAGING_THREAD
15 1
from tabpy.tabpy_server.management.state import get_query_object_path
16 1
from tabpy.tabpy_server.psws.callbacks import on_state_change
17 1
from tabpy.tabpy_server.handlers.util import AuthErrorStates
18 1
from tornado import gen
19
20
21 1
class EndpointHandler(ManagementHandler):
22 1
    def initialize(self, app):
23 1
        super(EndpointHandler, self).initialize(app)
24
25 1
    def get(self, endpoint_name):
26 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
27 1
            self.fail_with_auth_error()
28 1
            return
29
30 1
        self.logger.log(logging.DEBUG, f"Processing GET for /endpoints/{endpoint_name}")
31
32 1
        self._add_CORS_header()
33 1
        if not endpoint_name:
34 1
            self.write(json.dumps(self.tabpy_state.get_endpoints()))
35
        else:
36 1
            if endpoint_name in self.tabpy_state.get_endpoints():
37
                self.write(json.dumps(self.tabpy_state.get_endpoints()[endpoint_name]))
38
            else:
39 1
                self.error_out(
40
                    404,
41
                    "Unknown endpoint",
42
                    info=f"Endpoint {endpoint_name} is not found",
43
                )
44
45 1
    @gen.coroutine
46 1
    def put(self, name):
47
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
48
            self.fail_with_auth_error()
49
            return
50
51
        self.logger.log(logging.DEBUG, f"Processing PUT for /endpoints/{name}")
52
53
        try:
54
            if not self.request.body:
55
                self.error_out(400, "Input body cannot be empty")
56
                self.finish()
57
                return
58
            try:
59
                request_data = json.loads(self.request.body.decode("utf-8"))
60
            except BaseException as ex:
61
                self.error_out(
62
                    400, log_message="Failed to decode input body", info=str(ex)
63
                )
64
                self.finish()
65
                return
66
67
            # check if endpoint exists
68
            endpoints = self.tabpy_state.get_endpoints(name)
69
            if len(endpoints) == 0:
70
                self.error_out(404, f"endpoint {name} does not exist.")
71
                self.finish()
72
                return
73
74
            new_version = int(endpoints[name]["version"]) + 1
75
            self.logger.log(logging.INFO, f"Endpoint info: {request_data}")
76
            err_msg = yield self._add_or_update_endpoint(
77
                "update", name, new_version, request_data
78
            )
79
            if err_msg:
80
                self.error_out(400, err_msg)
81
                self.finish()
82
            else:
83
                self.write(self.tabpy_state.get_endpoints(name))
84
                self.finish()
85
86
        except Exception as e:
87
            err_msg = format_exception(e, "update_endpoint")
88
            self.error_out(500, err_msg)
89
            self.finish()
90
91 1
    @gen.coroutine
92 1
    def delete(self, name):
93
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
94
            self.fail_with_auth_error()
95
            return
96
97
        self.logger.log(logging.DEBUG, f"Processing DELETE for /endpoints/{name}")
98
99
        try:
100
            endpoints = self.tabpy_state.get_endpoints(name)
101
            if len(endpoints) == 0:
102
                self.error_out(404, f"endpoint {name} does not exist.")
103
                self.finish()
104
                return
105
106
            # update state
107
            try:
108
                endpoint_info = self.tabpy_state.delete_endpoint(name)
109
            except Exception as e:
110
                self.error_out(400, f"Error when removing endpoint: {e.message}")
111
                self.finish()
112
                return
113
114
            # delete files
115
            if endpoint_info["type"] != "alias":
116
                delete_path = get_query_object_path(
117
                    self.settings["state_file_path"], name, None
118
                )
119
                try:
120
                    yield self._delete_po_future(delete_path)
121
                except Exception as e:
122
                    self.error_out(400, f"Error while deleting: {e}")
123
                    self.finish()
124
                    return
125
126
            self.set_status(204)
127
            self.finish()
128
129
        except Exception as e:
130
            err_msg = format_exception(e, "delete endpoint")
131
            self.error_out(500, err_msg)
132
            self.finish()
133
134
        on_state_change(
135
            self.settings, self.tabpy_state, self.python_service, self.logger
136
        )
137
138 1
    @gen.coroutine
139 1
    def _delete_po_future(self, delete_path):
140
        future = STAGING_THREAD.submit(shutil.rmtree, delete_path)
141
        ret = yield future
142
        raise gen.Return(ret)
143