Passed
Push — master ( 227024...1d0698 )
by Oleksandr
02:44
created

tabpy.tabpy_server.psws.python_service   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 232
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 38
eloc 160
dl 0
loc 232
rs 9.36
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
B PythonService.query() 0 35 7
A PythonService.__init__() 0 5 1
A PythonService.list_objects() 0 11 2
A PythonService._load_object() 0 29 4
A PythonServiceHandler.__init__() 0 2 1
B PythonService.load_object() 0 43 6
B PythonServiceHandler.manage_request() 0 24 8
A PythonService.flush_objects() 0 6 1
A PythonService.delete_objects() 0 22 5
A PythonService.count_objects() 0 7 3
1
import concurrent.futures
2
import logging
3
import sys
4
5
6
from tabpy.tabpy_tools.query_object import QueryObject
7
from tabpy.tabpy_server.common.util import format_exception
8
from tabpy.tabpy_server.common.messages import (
9
    LoadObject, DeleteObjects, FlushObjects, CountObjects, ListObjects,
10
    UnknownMessage, LoadFailed, ObjectsDeleted, ObjectsFlushed, QueryFailed,
11
    QuerySuccessful, UnknownURI, DownloadSkipped, LoadInProgress, ObjectCount,
12
    ObjectList)
13
14
15
logger = logging.getLogger(__name__)
16
17
18
class PythonServiceHandler:
19
    """
20
    A wrapper around PythonService object that receives requests and calls the
21
    corresponding methods.
22
    """
23
    def __init__(self, ps):
24
        self.ps = ps
25
26
    def manage_request(self, msg):
27
        try:
28
            logger.debug(f'Received request {type(msg).__name__}')
29
            if isinstance(msg, LoadObject):
30
                response = self.ps.load_object(*msg)
31
            elif isinstance(msg, DeleteObjects):
32
                response = self.ps.delete_objects(msg.uris)
33
            elif isinstance(msg, FlushObjects):
34
                response = self.ps.flush_objects()
35
            elif isinstance(msg, CountObjects):
36
                response = self.ps.count_objects()
37
            elif isinstance(msg, ListObjects):
38
                response = self.ps.list_objects()
39
            else:
40
                response = UnknownMessage(msg)
41
42
            logger.debug(f'Returning response {response}')
43
            return response
44
        except Exception as e:
45
            msg = e
46
            if hasattr(e, 'message'):
47
                msg = e.message
48
            logger.error(f'Error processing request: {msg}')
49
            return UnknownMessage(msg)
50
51
52
class PythonService(object):
53
    """
54
    This class is a simple wrapper maintaining loaded query objects from
55
    the current TabPy instance. `query_objects` is a dictionary that
56
    maps query object URI to query objects
57
58
    The query_objects schema is as follow:
59
60
    {'version': <current-successfuly-loaded-version>,
61
     'last_error':<your-recent-error-to-load-model>,
62
     'endpoint_obj':<loaded_query_objects>,
63
     'type':<object-type>,
64
     'status':<LoadSuccessful-or-LoadFailed-or-LoadInProgress>}
65
66
    """
67
    def __init__(self,
68
                 query_objects=None):
69
70
        self.EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=1)
71
        self.query_objects = query_objects or {}
72
73
    def _load_object(self, object_uri, object_url, object_version, is_update,
74
                     object_type):
75
        try:
76
            logger.info(
77
                f'Loading object:, URI={object_uri}, '
78
                f'URL={object_url}, version={object_version}, '
79
                f'is_updated={is_update}')
80
            if object_type == 'model':
81
                po = QueryObject.load(object_url)
82
            elif object_type == 'alias':
83
                po = object_url
84
            else:
85
                raise RuntimeError(f'Unknown object type: {object_type}')
86
87
            self.query_objects[object_uri] = {'version': object_version,
88
                                              'type': object_type,
89
                                              'endpoint_obj': po,
90
                                              'status': 'LoadSuccessful',
91
                                              'last_error': None}
92
        except Exception as e:
93
            logger.error(f'Unable to load QueryObject: path={object_url}, '
94
                         f'error={str(e)}')
95
96
            self.query_objects[object_uri] = {
97
                'version': object_version,
98
                'type': object_type,
99
                'endpoint_obj': None,
100
                'status': 'LoadFailed',
101
                'last_error': f'Load failed: {str(e)}'}
102
103
    def load_object(self, object_uri, object_url, object_version, is_update,
104
                    object_type):
105
        try:
106
            obj_info = self.query_objects.get(object_uri)
107
            if obj_info and obj_info['endpoint_obj'] and (
108
                    obj_info['version'] >= object_version):
109
                logger.info(
110
                    "Received load message for object already loaded")
111
112
                return DownloadSkipped(
113
                    object_uri, obj_info['version'], "Object with greater "
114
                    "or equal version already loaded")
115
            else:
116
                if object_uri not in self.query_objects:
117
                    self.query_objects[object_uri] = {
118
                        'version': object_version,
119
                        'type': object_type,
120
                        'endpoint_obj': None,
121
                        'status': 'LoadInProgress',
122
                        'last_error': None}
123
                else:
124
                    self.query_objects[
125
                        object_uri]['status'] = 'LoadInProgress'
126
127
                self.EXECUTOR.submit(
128
                    self._load_object, object_uri, object_url,
129
                    object_version, is_update, object_type)
130
131
                return LoadInProgress(
132
                    object_uri, object_url, object_version, is_update,
133
                    object_type)
134
        except Exception as e:
135
            logger.error(f'Unable to load QueryObject: path={object_url}, '
136
                         f'error={str(e)}')
137
138
            self.query_objects[object_uri] = {
139
                'version': object_version,
140
                'type': object_type,
141
                'endpoint_obj': None,
142
                'status': 'LoadFailed',
143
                'last_error': str(e)}
144
145
            return LoadFailed(object_uri, object_version, str(e))
146
147
    def delete_objects(self, object_uris):
148
        """Delete one or more objects from the query_objects map"""
149
        if isinstance(object_uris, list):
150
            deleted = []
151
            for uri in object_uris:
152
                deleted.extend(self.delete_objects(uri).uris)
153
            return ObjectsDeleted(deleted)
154
        elif isinstance(object_uris, str):
155
            deleted_obj = self.query_objects.pop(object_uris, None)
156
            if deleted_obj:
157
                return ObjectsDeleted([object_uris])
158
            else:
159
                logger.warning(f'Received message to delete query object '
160
                               f'that doesn\'t exist: '
161
                               f'object_uris={object_uris}')
162
                return ObjectsDeleted([])
163
        else:
164
            logger.error(
165
                f'Unexpected input to delete objects: input={object_uris}, '
166
                f'info="Input should be list or str. '
167
                f'Type: {type(object_uris)}"')
168
            return ObjectsDeleted([])
169
170
    def flush_objects(self):
171
        """Flush objects from the query_objects map"""
172
        logger.debug("Flushing query objects")
173
        n = len(self.query_objects)
174
        self.query_objects.clear()
175
        return ObjectsFlushed(n, 0)
176
177
    def count_objects(self):
178
        """Count the number of Loaded QueryObjects stored in memory"""
179
        count = 0
180
        for uri, po in (self.query_objects.items()):
181
            if po['endpoint_obj'] is not None:
182
                count += 1
183
        return ObjectCount(count)
184
185
    def list_objects(self):
186
        """List the objects as (URI, version) pairs"""
187
188
        objects = {}
189
        for (uri, obj_info) in (self.query_objects.items()):
190
            objects[uri] = {'version': obj_info['version'],
191
                            'type': obj_info['type'],
192
                            'status': obj_info['status'],
193
                            'reason': obj_info['last_error']}
194
195
        return ObjectList(objects)
196
197
    def query(self, object_uri, params, uid):
198
        """Execute a QueryObject query"""
199
        logger.debug(f'Querying Python service {object_uri}...')
200
        try:
201
            if not isinstance(params, dict) and not isinstance(params, list):
202
                return QueryFailed(
203
                    uri=object_uri,
204
                    error=('Query parameter needs to be a dictionary or a list'
205
                           f'. Given value is of type {type(params)}'))
206
207
            obj_info = self.query_objects.get(object_uri)
208
            logger.debug(f'Found object {obj_info}')
209
            if obj_info:
210
                pred_obj = obj_info['endpoint_obj']
211
                version = obj_info['version']
212
213
                if not pred_obj:
214
                    return QueryFailed(
215
                        uri=object_uri,
216
                        error=("There is no query object associated to the "
217
                               f'endpoint: {object_uri}'))
218
219
                logger.debug(f'Qurying endpoint with params ({params})...')
220
                if isinstance(params, dict):
221
                    result = pred_obj.query(**params)
222
                else:
223
                    result = pred_obj.query(*params)
224
225
                return QuerySuccessful(object_uri, version, result)
226
            else:
227
                return UnknownURI(object_uri)
228
        except Exception as e:
229
            err_msg = format_exception(e, '/query')
230
            logger.error(err_msg)
231
            return QueryFailed(uri=object_uri, error=err_msg)
232