EndpointHandler._delete_po_future()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.216

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
ccs 2
cts 5
cp 0.4
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1.216
1
"""
2
HTTP handeler to serve specific endpoint request like
3
http://myserver:9004/endpoints/mymodel
4
5
For how generic endpoints requests is served look
6
at endpoints_handler.py
7
"""
8
9 1
import json
10 1
import logging
11 1
import shutil
12 1
from tabpy.tabpy_server.common.util import format_exception
13 1
from tabpy.tabpy_server.handlers import ManagementHandler
14 1
from tabpy.tabpy_server.handlers.base_handler import STAGING_THREAD
15 1
from tabpy.tabpy_server.management.state import get_query_object_path
16 1
from tabpy.tabpy_server.psws.callbacks import on_state_change
17 1
from tabpy.tabpy_server.handlers.util import AuthErrorStates
18 1
from tornado import gen
19
20
21 1
class EndpointHandler(ManagementHandler):
22 1
    def initialize(self, app):
23 1
        super(EndpointHandler, self).initialize(app)
24
25 1
    def get(self, endpoint_name):
26 1
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
27 1
            self.fail_with_auth_error()
28 1
            return
29
30 1
        self.logger.log(logging.DEBUG, f"Processing GET for /endpoints/{endpoint_name}")
31
32 1
        self._add_CORS_header()
33 1
        if not endpoint_name:
34 1
            self.write(json.dumps(self.tabpy_state.get_endpoints()))
35
        else:
36 1
            if endpoint_name in self.tabpy_state.get_endpoints():
37
                self.write(json.dumps(self.tabpy_state.get_endpoints()[endpoint_name]))
38
            else:
39 1
                self.error_out(
40
                    404,
41
                    "Unknown endpoint",
42
                    info=f"Endpoint {endpoint_name} is not found",
43
                )
44
45 1
    @gen.coroutine
46 1
    def put(self, name):
47
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
48
            self.fail_with_auth_error()
49
            return
50
51
        self.logger.log(logging.DEBUG, f"Processing PUT for /endpoints/{name}")
52
53
        try:
54
            if not self.request.body:
55
                self.error_out(400, "Input body cannot be empty")
56
                self.finish()
57
                return
58
            try:
59
                request_data = json.loads(self.request.body.decode("utf-8"))
60
            except BaseException as ex:
61
                self.error_out(
62
                    400, log_message="Failed to decode input body", info=str(ex)
63
                )
64
                self.finish()
65
                return
66
67
            # check if endpoint exists
68
            endpoints = self.tabpy_state.get_endpoints(name)
69
            if len(endpoints) == 0:
70
                self.error_out(404, f"endpoint {name} does not exist.")
71
                self.finish()
72
                return
73
74
            version_after_update = int(endpoints[name]["version"])
75
            if request_data.get('should_update_version'):
76
                version_after_update += 1
77
            self.logger.log(logging.INFO, f"Endpoint info: {request_data}")
78
            err_msg = yield self._add_or_update_endpoint(
79
                "update", name, version_after_update, request_data
80
            )
81
            if err_msg:
82
                self.error_out(400, err_msg)
83
                self.finish()
84
            else:
85
                self.write(self.tabpy_state.get_endpoints(name))
86
                self.finish()
87
88
        except Exception as e:
89
            err_msg = format_exception(e, "update_endpoint")
90
            self.error_out(500, err_msg)
91
            self.finish()
92
93 1
    @gen.coroutine
94 1
    def delete(self, name):
95
        if self.should_fail_with_auth_error() != AuthErrorStates.NONE:
96
            self.fail_with_auth_error()
97
            return
98
99
        self.logger.log(logging.DEBUG, f"Processing DELETE for /endpoints/{name}")
100
101
        try:
102
            endpoints = self.tabpy_state.get_endpoints(name)
103
            if len(endpoints) == 0:
104
                self.error_out(404, f"endpoint {name} does not exist.")
105
                self.finish()
106
                return
107
108
            # update state
109
            try:
110
                endpoint_info = self.tabpy_state.delete_endpoint(name)
111
            except Exception as e:
112
                self.error_out(400, f"Error when removing endpoint: {e.message}")
113
                self.finish()
114
                return
115
116
            # delete files
117
            if endpoint_info["type"] != "alias":
118
                query_path = get_query_object_path(
119
                    self.settings["state_file_path"], name, None
120
                )
121
                staging_path = query_path.replace("/query_objects/", "/staging/endpoints/")
122
                try:
123
                    yield self._delete_po_future(query_path)
124
                    yield self._delete_po_future(staging_path)
125
                except Exception as e:
126
                    self.error_out(400, f"Error while deleting: {e}")
127
                    self.finish()
128
                    return
129
130
            self.set_status(204)
131
            self.finish()
132
133
        except Exception as e:
134
            err_msg = format_exception(e, "delete endpoint")
135
            self.error_out(500, err_msg)
136
            self.finish()
137
138
        on_state_change(
139
            self.settings, self.tabpy_state, self.python_service, self.logger
140
        )
141
142 1
    @gen.coroutine
143 1
    def _delete_po_future(self, delete_path):
144
        future = STAGING_THREAD.submit(shutil.rmtree, delete_path)
145
        ret = yield future
146
        raise gen.Return(ret)
147