Passed
Pull Request — master (#644)
by
unknown
21:04 queued 04:15
created

tabpy.tabpy_server.handlers.management_handler   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 151
Duplicated Lines 0 %

Test Coverage

Coverage 25.92%

Importance

Changes 0
Metric Value
wmc 20
eloc 105
dl 0
loc 151
ccs 21
cts 81
cp 0.2592
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A ManagementHandler._get_protocol() 0 2 1
A ManagementHandler.initialize() 0 3 1
A ManagementHandler._copy_po_future() 0 7 1
D ManagementHandler._add_or_update_endpoint() 0 98 12

1 Function

Rating   Name   Duplication   Size   Complexity  
A copy_from_local() 0 19 5
1 1
import logging
2 1
import os
3 1
import shutil
4 1
from re import compile as _compile
5 1
from uuid import uuid4 as random_uuid
6
7 1
from tornado import gen
8
9 1
from tabpy.tabpy_server.app.app_parameters import SettingsParameters
10 1
from tabpy.tabpy_server.handlers import BaseHandler
11 1
from tabpy.tabpy_server.handlers.base_handler import STAGING_THREAD
12 1
from tabpy.tabpy_server.management.state import get_query_object_path
13 1
from tabpy.tabpy_server.psws.callbacks import on_state_change
14
15
16 1
def copy_from_local(localpath, remotepath, is_dir=False):
17
    if is_dir:
18
        if not os.path.exists(remotepath):
19
            # remote folder does not exist
20
            shutil.copytree(localpath, remotepath)
21
        else:
22
            # remote folder exists, copy each file
23
            src_files = os.listdir(localpath)
24
            for file_name in src_files:
25
                full_file_name = os.path.join(localpath, file_name)
26
                if os.path.isdir(full_file_name):
27
                    # copy folder recursively
28
                    full_remote_path = os.path.join(remotepath, file_name)
29
                    shutil.copytree(full_file_name, full_remote_path)
30
                else:
31
                    # copy each file
32
                    shutil.copy(full_file_name, remotepath)
33
    else:
34
        shutil.copy(localpath, remotepath)
35
36
37 1
class ManagementHandler(BaseHandler):
38 1
    def initialize(self, app):
39 1
        super(ManagementHandler, self).initialize(app)
40 1
        self.port = self.settings[SettingsParameters.Port]
41
42 1
    def _get_protocol(self):
43
        return "http://"
44
45 1
    @gen.coroutine
46 1
    def _add_or_update_endpoint(self, action, name, version, request_data):
47
        """
48
        Add or update an endpoint
49
        """
50
        self.logger.log(logging.DEBUG, f"Adding/updating model {name}...")
51
52
        if not isinstance(name, str):
53
            msg = "Endpoint name must be a string"
54
            self.logger.log(logging.CRITICAL, msg)
55
            raise TypeError(msg)
56
57
        name_checker = _compile(r"^[a-zA-Z0-9-_\s]+$")
58
        if not name_checker.match(name):
59
            raise gen.Return(
60
                "endpoint name can only contain: a-z, A-Z, 0-9,"
61
                " underscore, hyphens and spaces."
62
            )
63
64
        if self.settings.get("add_or_updating_endpoint"):
65
            msg = (
66
                "Another endpoint update is already in progress"
67
                ", please wait a while and try again"
68
            )
69
            self.logger.log(logging.CRITICAL, msg)
70
            raise RuntimeError(msg)
71
72
        self.settings["add_or_updating_endpoint"] = random_uuid()
73
        try:
74
            docstring = None
75
            if "docstring" in request_data:
76
                docstring = str(
77
                    bytes(request_data["docstring"], "utf-8").decode("unicode_escape")
78
                )
79
80
            description = request_data.get("description", None)
81
            endpoint_type = request_data.get("type", None)
82
            methods = request_data.get("methods", [])
83
            dependencies = request_data.get("dependencies", None)
84
            target = request_data.get("target", None)
85
            schema = request_data.get("schema", None)
86
            src_path = request_data.get("src_path", None)
87
            target_path = get_query_object_path(
88
                self.settings[SettingsParameters.StateFilePath], name, version
89
            )
90
91
            path_checker = _compile(r"^[\\\:a-zA-Z0-9-_~\s/\.\(\)]+$")
92
            # copy from staging
93
            if src_path:
94
                if not isinstance(src_path, str):
95
                    raise gen.Return("src_path must be a string.")
96
                if not path_checker.match(src_path):
97
                    raise gen.Return(f"Invalid source path for endpoint {name}")
98
99
                yield self._copy_po_future(src_path, target_path)
100
            elif endpoint_type != "alias":
101
                raise gen.Return("src_path is required to add/update an endpoint.")
102
            else:
103
                # alias special logic:
104
                if not target:
105
                    raise gen.Return("Target is required for alias endpoint.")
106
                dependencies = [target]
107
108
            # update local config
109
            try:
110
                if action == "add":
111
                    self.tabpy_state.add_endpoint(
112
                        name=name,
113
                        description=description,
114
                        docstring=docstring,
115
                        endpoint_type=endpoint_type,
116
                        methods=methods,
117
                        dependencies=dependencies,
118
                        target=target,
119
                        schema=schema,
120
                    )
121
                else:
122
                    self.tabpy_state.update_endpoint(
123
                        name=name,
124
                        description=description,
125
                        docstring=docstring,
126
                        endpoint_type=endpoint_type,
127
                        methods=methods,
128
                        dependencies=dependencies,
129
                        target=target,
130
                        schema=schema,
131
                        version=version,
132
                    )
133
134
            except Exception as e:
135
                raise gen.Return(f"Error when changing TabPy state: {e}")
136
137
            on_state_change(
138
                self.settings, self.tabpy_state, self.python_service, self.logger
139
            )
140
141
        finally:
142
            self.settings["add_or_updating_endpoint"] = None
143
144 1
    @gen.coroutine
145 1
    def _copy_po_future(self, src_path, target_path):
146
        future = STAGING_THREAD.submit(
147
            copy_from_local, src_path, target_path, is_dir=True
148
        )
149
        ret = yield future
150
        raise gen.Return(ret)
151