PythonService._load_object()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 35
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 16.5853

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 35
ccs 1
cts 13
cp 0.0769
rs 9.256
c 0
b 0
f 0
cc 4
nop 6
crap 16.5853
1 1
import concurrent.futures
2 1
import logging
3 1
from tabpy.tabpy_server.common.util import format_exception
4 1
from tabpy.tabpy_server.common.messages import (
5
    LoadObject,
6
    DeleteObjects,
7
    FlushObjects,
8
    CountObjects,
9
    ListObjects,
10
    UnknownMessage,
11
    LoadFailed,
12
    ObjectsDeleted,
13
    ObjectsFlushed,
14
    QueryFailed,
15
    QuerySuccessful,
16
    UnknownURI,
17
    DownloadSkipped,
18
    LoadInProgress,
19
    ObjectCount,
20
    ObjectList,
21
)
22 1
from tabpy.tabpy_tools.query_object import QueryObject
23
24
25 1
logger = logging.getLogger(__name__)
26
27
28 1
class PythonServiceHandler:
29
    """
30
    A wrapper around PythonService object that receives requests and calls the
31
    corresponding methods.
32
    """
33
34 1
    def __init__(self, ps):
35 1
        self.ps = ps
36
37 1
    def manage_request(self, msg):
38
        try:
39
            logger.debug(f"Received request {type(msg).__name__}")
40
            if isinstance(msg, LoadObject):
41
                response = self.ps.load_object(*msg)
42
            elif isinstance(msg, DeleteObjects):
43
                response = self.ps.delete_objects(msg.uris)
44
            elif isinstance(msg, FlushObjects):
45
                response = self.ps.flush_objects()
46
            elif isinstance(msg, CountObjects):
47
                response = self.ps.count_objects()
48
            elif isinstance(msg, ListObjects):
49
                response = self.ps.list_objects()
50
            else:
51
                response = UnknownMessage(msg)
52
53
            logger.debug(f"Returning response {response}")
54
            return response
55
        except Exception as e:
56
            logger.exception(e)
57
            msg = e
58
            if hasattr(e, "message"):
59
                msg = e.message
60
            logger.error(f"Error processing request: {msg}")
61
            return UnknownMessage(msg)
62
63
64 1
class PythonService:
65
    """
66
    This class is a simple wrapper maintaining loaded query objects from
67
    the current TabPy instance. `query_objects` is a dictionary that
68
    maps query object URI to query objects
69
70
    The query_objects schema is as follow:
71
72
    {'version': <current-successfuly-loaded-version>,
73
     'last_error':<your-recent-error-to-load-model>,
74
     'endpoint_obj':<loaded_query_objects>,
75
     'type':<object-type>,
76
     'status':<LoadSuccessful-or-LoadFailed-or-LoadInProgress>}
77
78
    """
79
80 1
    def __init__(self, query_objects=None):
81
82 1
        self.EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=1)
83 1
        self.query_objects = query_objects or {}
84
85 1
    def _load_object(
86
        self, object_uri, object_url, object_version, is_update, object_type
87
    ):
88
        try:
89
            logger.info(
90
                f"Loading object:, URI={object_uri}, "
91
                f"URL={object_url}, version={object_version}, "
92
                f"is_updated={is_update}"
93
            )
94
            if object_type == "model":
95
                po = QueryObject.load(object_url)
96
            elif object_type == "alias":
97
                po = object_url
98
            else:
99
                raise RuntimeError(f"Unknown object type: {object_type}")
100
101
            self.query_objects[object_uri] = {
102
                "version": object_version,
103
                "type": object_type,
104
                "endpoint_obj": po,
105
                "status": "LoadSuccessful",
106
                "last_error": None,
107
            }
108
        except Exception as e:
109
            logger.exception(e)
110
            logger.error(
111
                f"Unable to load QueryObject: path={object_url}, " f"error={str(e)}"
112
            )
113
114
            self.query_objects[object_uri] = {
115
                "version": object_version,
116
                "type": object_type,
117
                "endpoint_obj": None,
118
                "status": "LoadFailed",
119
                "last_error": f"Load failed: {str(e)}",
120
            }
121
122 1
    def load_object(
123
        self, object_uri, object_url, object_version, is_update, object_type
124
    ):
125
        try:
126
            obj_info = self.query_objects.get(object_uri)
127
            if (
128
                obj_info
129
                and obj_info["endpoint_obj"]
130
                and (obj_info["version"] >= object_version)
131
            ):
132
                logger.info("Received load message for object already loaded")
133
134
                return DownloadSkipped(
135
                    object_uri,
136
                    obj_info["version"],
137
                    "Object with greater " "or equal version already loaded",
138
                )
139
            else:
140
                if object_uri not in self.query_objects:
141
                    self.query_objects[object_uri] = {
142
                        "version": object_version,
143
                        "type": object_type,
144
                        "endpoint_obj": None,
145
                        "status": "LoadInProgress",
146
                        "last_error": None,
147
                    }
148
                else:
149
                    self.query_objects[object_uri]["status"] = "LoadInProgress"
150
151
                self.EXECUTOR.submit(
152
                    self._load_object,
153
                    object_uri,
154
                    object_url,
155
                    object_version,
156
                    is_update,
157
                    object_type,
158
                )
159
160
                return LoadInProgress(
161
                    object_uri, object_url, object_version, is_update, object_type
162
                )
163
        except Exception as e:
164
            logger.exception(e)
165
            logger.error(
166
                f"Unable to load QueryObject: path={object_url}, " f"error={str(e)}"
167
            )
168
169
            self.query_objects[object_uri] = {
170
                "version": object_version,
171
                "type": object_type,
172
                "endpoint_obj": None,
173
                "status": "LoadFailed",
174
                "last_error": str(e),
175
            }
176
177
            return LoadFailed(object_uri, object_version, str(e))
178
179 1
    def delete_objects(self, object_uris):
180
        """Delete one or more objects from the query_objects map"""
181
        if isinstance(object_uris, list):
182
            deleted = []
183
            for uri in object_uris:
184
                deleted.extend(self.delete_objects(uri).uris)
185
            return ObjectsDeleted(deleted)
186
        elif isinstance(object_uris, str):
187
            deleted_obj = self.query_objects.pop(object_uris, None)
188
            if deleted_obj:
189
                return ObjectsDeleted([object_uris])
190
            else:
191
                logger.warning(
192
                    f"Received message to delete query object "
193
                    f"that doesn't exist: "
194
                    f"object_uris={object_uris}"
195
                )
196
                return ObjectsDeleted([])
197
        else:
198
            logger.error(
199
                f"Unexpected input to delete objects: input={object_uris}, "
200
                f'info="Input should be list or str. '
201
                f'Type: {type(object_uris)}"'
202
            )
203
            return ObjectsDeleted([])
204
205 1
    def flush_objects(self):
206
        """Flush objects from the query_objects map"""
207
        logger.debug("Flushing query objects")
208
        n = len(self.query_objects)
209
        self.query_objects.clear()
210
        return ObjectsFlushed(n, 0)
211
212 1
    def count_objects(self):
213
        """Count the number of Loaded QueryObjects stored in memory"""
214
        count = 0
215
        for uri, po in self.query_objects.items():
216
            if po["endpoint_obj"] is not None:
217
                count += 1
218
        return ObjectCount(count)
219
220 1
    def list_objects(self):
221
        """List the objects as (URI, version) pairs"""
222
223
        objects = {}
224
        for (uri, obj_info) in self.query_objects.items():
225
            objects[uri] = {
226
                "version": obj_info["version"],
227
                "type": obj_info["type"],
228
                "status": obj_info["status"],
229
                "reason": obj_info["last_error"],
230
            }
231
232
        return ObjectList(objects)
233
234 1
    def query(self, object_uri, params, uid):
235
        """Execute a QueryObject query"""
236
        logger.debug(f"Querying Python service {object_uri}...")
237
        try:
238
            if not isinstance(params, dict) and not isinstance(params, list):
239
                return QueryFailed(
240
                    uri=object_uri,
241
                    error=(
242
                        "Query parameter needs to be a dictionary or a list"
243
                        f". Given value is of type {type(params)}"
244
                    ),
245
                )
246
247
            obj_info = self.query_objects.get(object_uri)
248
            logger.debug(f"Found object {obj_info}")
249
            if obj_info:
250
                pred_obj = obj_info["endpoint_obj"]
251
                version = obj_info["version"]
252
253
                if not pred_obj:
254
                    return QueryFailed(
255
                        uri=object_uri,
256
                        error=(
257
                            "There is no query object associated to the "
258
                            f"endpoint: {object_uri}"
259
                        ),
260
                    )
261
262
                logger.debug(f"Querying endpoint with params ({params})...")
263
                if isinstance(params, dict):
264
                    result = pred_obj.query(**params)
265
                else:
266
                    result = pred_obj.query(*params)
267
268
                return QuerySuccessful(object_uri, version, result)
269
            else:
270
                return UnknownURI(object_uri)
271
        except Exception as e:
272
            logger.exception(e)
273
            err_msg = format_exception(e, "/query")
274
            logger.error(err_msg)
275
            return QueryFailed(uri=object_uri, error=err_msg)
276