Passed
Push — master ( f89136...5df0e0 )
by Oleksandr
11:44
created

tabpy.tabpy_server.management.state   F

Complexity

Total Complexity 100

Size/Duplication

Total Lines 644
Duplicated Lines 0 %

Test Coverage

Coverage 41.03%

Importance

Changes 0
Metric Value
wmc 100
eloc 373
dl 0
loc 644
rs 2
c 0
b 0
f 0
ccs 119
cts 290
cp 0.4103

31 Methods

Rating   Name   Duplication   Size   Complexity  
A TabPyState.__init__() 0 3 1
A TabPyState.get_endpoints() 0 51 4
A TabPyState.set_config() 0 11 3
B TabPyState.add_endpoint() 0 66 5
A TabPyState.get_description() 0 12 2
A TabPyState._check_and_set_endpoint_docstring() 0 2 1
A TabPyState.name() 0 11 2
A TabPyState.creation_time() 0 13 2
A TabPyState._remove_config_option() 0 14 3
B TabPyState._get_config_value() 0 27 5
A TabPyState.get_access_control_allow_headers() 0 12 2
A TabPyState._has_config_value() 0 4 2
A TabPyState._check_and_set_dependencies() 0 8 4
A TabPyState._set_config_value() 0 20 4
A TabPyState.get_access_control_allow_methods() 0 12 2
A TabPyState._check_and_set_endpoint_description() 0 2 1
A TabPyState._check_endpoint_exists() 0 6 4
A TabPyState.get_access_control_allow_origin() 0 13 2
C TabPyState.delete_endpoint() 0 55 9
A TabPyState._check_and_set_endpoint_type() 0 3 1
A TabPyState._write_state() 0 6 1
A TabPyState.set_description() 0 18 3
A TabPyState._add_update_endpoints_config() 0 22 3
C TabPyState.update_endpoint() 0 82 9
A TabPyState._get_config_items() 0 4 2
A TabPyState._set_revision_number() 0 12 3
A TabPyState._check_and_set_endpoint_str_value() 0 8 5
A TabPyState.get_revision_number() 0 10 2
A TabPyState.set_name() 0 16 3
A TabPyState._increase_revision_number() 0 5 2
A TabPyState._check_target() 0 3 3

3 Functions

Rating   Name   Duplication   Size   Complexity  
A state_lock() 0 14 1
A _get_root_path() 0 5 2
A get_query_object_path() 0 12 2

How to fix   Complexity   

Complexity

Complex classes like tabpy.tabpy_server.management.state often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
try:
2 1
    from ConfigParser import ConfigParser
3 1
except ImportError:
4 1
    from configparser import ConfigParser
5 1
import json
6 1
import logging
7 1
from tabpy.tabpy_server.management.util import write_state_config
8 1
from threading import Lock
9 1
from time import time
10
11
12 1
logger = logging.getLogger(__name__)
13
14
# State File Config Section Names
15 1
_DEPLOYMENT_SECTION_NAME = "Query Objects Service Versions"
16 1
_QUERY_OBJECT_DOCSTRING = "Query Objects Docstrings"
17 1
_SERVICE_INFO_SECTION_NAME = "Service Info"
18 1
_META_SECTION_NAME = "Meta"
19
20
# Directory Names
21 1
_QUERY_OBJECT_DIR = "query_objects"
22
23
"""
24
Lock to change the TabPy State.
25
"""
26 1
_PS_STATE_LOCK = Lock()
27
28
29 1
def state_lock(func):
30
    """
31
    Mutex for changing PS state
32
    """
33
34 1
    def wrapper(self, *args, **kwargs):
35 1
        try:
36 1
            _PS_STATE_LOCK.acquire()
37 1
            return func(self, *args, **kwargs)
38
        finally:
39
            # ALWAYS RELEASE LOCK
40 1
            _PS_STATE_LOCK.release()
41
42 1
    return wrapper
43
44
45 1
def _get_root_path(state_path):
46 1
    if state_path[-1] != "/":
47 1
        state_path += "/"
48
49 1
    return state_path
50
51
52 1
def get_query_object_path(state_file_path, name, version):
53
    """
54
    Returns the query object path
55
56
    If the version is None, a path without the version will be returned.
57
    """
58 1
    root_path = _get_root_path(state_file_path)
59 1
    sub_path = [_QUERY_OBJECT_DIR, name]
60 1
    if version is not None:
61 1
        sub_path.append(str(version))
62 1
    full_path = root_path + "/".join(sub_path)
63 1
    return full_path
64
65
66 1
class TabPyState:
67
    """
68
    The TabPy state object that stores attributes
69
    about this TabPy and perform GET/SET on these
70
    attributes.
71
72
    Attributes:
73
        - name
74
        - description
75
        - endpoints (name, description, docstring, version, target)
76
        - revision number
77
78
    When the state object is initialized, the state is saved as a ConfigParser.
79
    There is a config to any attribute.
80
81
    """
82
83 1
    def __init__(self, settings, config=None):
84 1
        self.settings = settings
85 1
        self.set_config(config, _update=False)
86
87 1
    @state_lock
88 1
    def set_config(self, config, logger=logging.getLogger(__name__), _update=True):
89
        """
90
        Set the local ConfigParser manually.
91
        This new ConfigParser will be used as current state.
92
        """
93 1
        if not isinstance(config, ConfigParser):
94
            raise ValueError("Invalid config")
95 1
        self.config = config
96 1
        if _update:
97
            self._write_state(logger)
98
99 1
    def get_endpoints(self, name=None):
100
        """
101
        Return a dictionary of endpoints
102
103
        Parameters
104
        ----------
105
        name : str
106
            The name of the endpoint.
107
            If "name" is specified, only the information about that endpoint
108
            will be returned.
109
110
        Returns
111
        -------
112
        endpoints : dict
113
            The dictionary containing information about each endpoint.
114
            The keys are the endpoint names.
115
            The values for each include:
116
                - description
117
                - doc string
118
                - type
119
                - target
120
121
        """
122 1
        endpoints = {}
123 1
        try:
124 1
            endpoint_names = self._get_config_value(_DEPLOYMENT_SECTION_NAME, name)
125
        except Exception as e:
126
            logger.error(f"error in get_endpoints: {str(e)}")
127
            return {}
128
129 1
        if name:
130
            endpoint_info = json.loads(endpoint_names)
131
            docstring = self._get_config_value(_QUERY_OBJECT_DOCSTRING, name)
132
            endpoint_info["docstring"] = str(
133
                bytes(docstring, "utf-8").decode("unicode_escape")
134
            )
135
            endpoints = {name: endpoint_info}
136
        else:
137 1
            for endpoint_name in endpoint_names:
138 1
                endpoint_info = json.loads(
139
                    self._get_config_value(_DEPLOYMENT_SECTION_NAME, endpoint_name)
140
                )
141 1
                docstring = self._get_config_value(
142
                    _QUERY_OBJECT_DOCSTRING, endpoint_name, True, ""
143
                )
144 1
                endpoint_info["docstring"] = str(
145
                    bytes(docstring, "utf-8").decode("unicode_escape")
146
                )
147 1
                endpoints[endpoint_name] = endpoint_info
148 1
        logger.debug(f"Collected endpoints: {endpoints}")
149 1
        return endpoints
150
151 1
    def _check_endpoint_exists(self, name):
152
        endpoints = self.get_endpoints()
153
        if not name or not isinstance(name, str) or len(name) == 0:
154
            raise ValueError("name of the endpoint must be a valid string.")
155
156
        return name in endpoints
157
158 1
    def _check_and_set_endpoint_str_value(self, param, paramName, defaultValue):
159
        if not param and defaultValue is not None:
160
            return defaultValue
161
162
        if not param or not isinstance(param, str):
163
            raise ValueError(f"{paramName} must be a string.")
164
165
        return param
166
167 1
    def _check_and_set_endpoint_description(self, description, defaultValue):
168
        return self._check_and_set_endpoint_str_value(description, "description", defaultValue)
169
170 1
    def _check_and_set_endpoint_docstring(self, docstring, defaultValue):
171
        return self._check_and_set_endpoint_str_value(docstring, "docstring", defaultValue)
172
173 1
    def _check_and_set_endpoint_type(self, endpoint_type, defaultValue):
174
        return self._check_and_set_endpoint_str_value(
175
            endpoint_type, "endpoint type", defaultValue)
176
177 1
    def _check_target(self, target):
178
        if target and not isinstance(target, str):
179
            raise ValueError("target must be a string.")
180
181 1
    def _check_and_set_dependencies(self, dependencies, defaultValue):
182
        if not dependencies:
183
            return defaultValue
184
185
        if dependencies or not isinstance(dependencies, list):
186
            raise ValueError("dependencies must be a list.")
187
188
        return dependencies
189
190 1
    @state_lock
191 1
    def add_endpoint(
192
        self,
193
        name,
194
        description=None,
195
        docstring=None,
196
        endpoint_type=None,
197
        methods=None,
198
        target=None,
199
        dependencies=None,
200
        schema=None,
201
    ):
202
        """
203
        Add a new endpoint to the TabPy.
204
205
        Parameters
206
        ----------
207
        name : str
208
            Name of the endpoint
209
        description : str, optional
210
            Description of this endpoint
211
        doc_string : str, optional
212
            The doc string for this endpoint, if needed.
213
        endpoint_type : str
214
            The endpoint type (model, alias)
215
        target : str, optional
216
            The target endpoint name for the alias to be added.
217
218
        Note:
219
        The version of this endpoint will be set to 1 since it is a new
220
        endpoint.
221
222
        """
223
        try:
224
            if (self._check_endpoint_exists(name)):
225
                raise ValueError(f"endpoint {name} already exists.")
226
227
            endpoints = self.get_endpoints()
228
229
            description = self._check_and_set_endpoint_description(description, "")
230
            docstring = self._check_and_set_endpoint_docstring(
231
                docstring, "-- no docstring found in query function --")
232
            endpoint_type = self._check_and_set_endpoint_type(endpoint_type, None)
233
            dependencies = self._check_and_set_dependencies(dependencies, [])
234
235
            self._check_target(target)
236
            if target and target not in endpoints:
237
                raise ValueError("target endpoint is not valid.")
238
239
            endpoint_info = {
240
                "description": description,
241
                "docstring": docstring,
242
                "type": endpoint_type,
243
                "version": 1,
244
                "dependencies": dependencies,
245
                "target": target,
246
                "creation_time": int(time()),
247
                "last_modified_time": int(time()),
248
                "schema": schema,
249
            }
250
251
            endpoints[name] = endpoint_info
252
            self._add_update_endpoints_config(endpoints)
253
        except Exception as e:
254
            logger.error(f"Error in add_endpoint: {e}")
255
            raise
256
257 1
    def _add_update_endpoints_config(self, endpoints):
258
        # save the endpoint info to config
259
        dstring = ""
260
        for endpoint_name in endpoints:
261
            try:
262
                info = endpoints[endpoint_name]
263
                dstring = str(
264
                    bytes(info["docstring"], "utf-8").decode("unicode_escape")
265
                )
266
                self._set_config_value(
267
                    _QUERY_OBJECT_DOCSTRING,
268
                    endpoint_name,
269
                    dstring,
270
                    _update_revision=False,
271
                )
272
                del info["docstring"]
273
                self._set_config_value(
274
                    _DEPLOYMENT_SECTION_NAME, endpoint_name, json.dumps(info)
275
                )
276
            except Exception as e:
277
                logger.error(f"Unable to write endpoints config: {e}")
278
                raise
279
280 1
    @state_lock
281 1
    def update_endpoint(
282
        self,
283
        name,
284
        description=None,
285
        docstring=None,
286
        endpoint_type=None,
287
        version=None,
288
        methods=None,
289
        target=None,
290
        dependencies=None,
291
        schema=None,
292
    ):
293
        """
294
        Update an existing endpoint on the TabPy.
295
296
        Parameters
297
        ----------
298
        name : str
299
            Name of the endpoint
300
        description : str, optional
301
            Description of this endpoint
302
        doc_string : str, optional
303
            The doc string for this endpoint, if needed.
304
        endpoint_type : str, optional
305
            The endpoint type (model, alias)
306
        version : str, optional
307
            The version of this endpoint
308
        dependencies=[]
309
            List of dependent endpoints for this existing endpoint
310
        target : str, optional
311
            The target endpoint name for the alias.
312
313
        Note:
314
        For those parameters that are not specified, those values will not
315
        get changed.
316
317
        """
318
        try:
319
            if (not self._check_endpoint_exists(name)):
320
                raise ValueError(f"endpoint {name} does not exist.")
321
322
            endpoints = self.get_endpoints()
323
            endpoint_info = endpoints[name]
324
325
            description = self._check_and_set_endpoint_description(
326
                description, endpoint_info["description"])
327
            docstring = self._check_and_set_endpoint_docstring(
328
                docstring, endpoint_info["docstring"])
329
            endpoint_type = self._check_and_set_endpoint_type(
330
                endpoint_type, endpoint_info["type"])
331
            dependencies = self._check_and_set_dependencies(
332
                dependencies, endpoint_info.get("dependencies", []))
333
334
            self._check_target(target)
335
            if target and target not in endpoints:
336
                raise ValueError("target endpoint is not valid.")
337
            elif not target:
338
                target = endpoint_info["target"]
339
340
            if version and not isinstance(version, int):
341
                raise ValueError("version must be an int.")
342
            elif not version:
343
                version = endpoint_info["version"]
344
345
            endpoint_info = {
346
                "description": description,
347
                "docstring": docstring,
348
                "type": endpoint_type,
349
                "version": version,
350
                "dependencies": dependencies,
351
                "target": target,
352
                "creation_time": endpoint_info["creation_time"],
353
                "last_modified_time": int(time()),
354
                "schema": schema,
355
            }
356
357
            endpoints[name] = endpoint_info
358
            self._add_update_endpoints_config(endpoints)
359
        except Exception as e:
360
            logger.error(f"Error in update_endpoint: {e}")
361
            raise
362
363 1
    @state_lock
364
    def delete_endpoint(self, name):
365
        """
366
        Delete an existing endpoint on the TabPy
367
368
        Parameters
369
        ----------
370
        name : str
371
            The name of the endpoint to be deleted.
372
373
        Returns
374
        -------
375
        deleted endpoint object
376
377
        Note:
378
        Cannot delete this endpoint if other endpoints are currently
379
        depending on this endpoint.
380
381
        """
382
        if not name or name == "":
383
            raise ValueError("Name of the endpoint must be a valid string.")
384
        endpoints = self.get_endpoints()
385
        if name not in endpoints:
386
            raise ValueError(f"Endpoint {name} does not exist.")
387
388
        endpoint_to_delete = endpoints[name]
389
390
        # get dependencies and target
391
        deps = set()
392
        for endpoint_name in endpoints:
393
            if endpoint_name != name:
394
                deps_list = endpoints[endpoint_name].get("dependencies", [])
395
                if name in deps_list:
396
                    deps.add(endpoint_name)
397
398
        # check if other endpoints are depending on this endpoint
399
        if len(deps) > 0:
400
            raise ValueError(
401
                f"Cannot remove endpoint {name}, it is currently "
402
                f"used by {list(deps)} endpoints."
403
            )
404
405
        del endpoints[name]
406
407
        # delete the endpoint from state
408
        try:
409
            self._remove_config_option(
410
                _QUERY_OBJECT_DOCSTRING, name, _update_revision=False
411
            )
412
            self._remove_config_option(_DEPLOYMENT_SECTION_NAME, name)
413
414
            return endpoint_to_delete
415
        except Exception as e:
416
            logger.error(f"Unable to delete endpoint {e}")
417
            raise ValueError(f"Unable to delete endpoint: {e}")
418
419 1
    @property
420
    def name(self):
421
        """
422
        Returns the name of the TabPy service.
423
        """
424 1
        name = None
425 1
        try:
426 1
            name = self._get_config_value(_SERVICE_INFO_SECTION_NAME, "Name")
427
        except Exception as e:
428
            logger.error(f"Unable to get name: {e}")
429 1
        return name
430
431 1
    @property
432
    def creation_time(self):
433
        """
434
        Returns the creation time of the TabPy service.
435
        """
436 1
        creation_time = 0
437 1
        try:
438 1
            creation_time = self._get_config_value(
439
                _SERVICE_INFO_SECTION_NAME, "Creation Time"
440
            )
441
        except Exception as e:
442
            logger.error(f"Unable to get name: {e}")
443 1
        return creation_time
444
445 1
    @state_lock
446
    def set_name(self, name):
447
        """
448
        Set the name of this TabPy service.
449
450
        Parameters
451
        ----------
452
        name : str
453
            Name of TabPy service.
454
        """
455
        if not isinstance(name, str):
456
            raise ValueError("name must be a string.")
457
        try:
458
            self._set_config_value(_SERVICE_INFO_SECTION_NAME, "Name", name)
459
        except Exception as e:
460
            logger.error(f"Unable to set name: {e}")
461
462 1
    def get_description(self):
463
        """
464
        Returns the description of the TabPy service.
465
        """
466 1
        description = None
467 1
        try:
468 1
            description = self._get_config_value(
469
                _SERVICE_INFO_SECTION_NAME, "Description"
470
            )
471
        except Exception as e:
472
            logger.error(f"Unable to get description: {e}")
473 1
        return description
474
475 1
    @state_lock
476
    def set_description(self, description):
477
        """
478
        Set the description of this TabPy service.
479
480
        Parameters
481
        ----------
482
        description : str
483
            Description of TabPy service.
484
        """
485
        if not isinstance(description, str):
486
            raise ValueError("Description must be a string.")
487
        try:
488
            self._set_config_value(
489
                _SERVICE_INFO_SECTION_NAME, "Description", description
490
            )
491
        except Exception as e:
492
            logger.error(f"Unable to set description: {e}")
493
494 1
    def get_revision_number(self):
495
        """
496
        Returns the revision number of this TabPy service.
497
        """
498
        rev = -1
499
        try:
500
            rev = int(self._get_config_value(_META_SECTION_NAME, "Revision Number"))
501
        except Exception as e:
502
            logger.error(f"Unable to get revision number: {e}")
503
        return rev
504
505 1
    def get_access_control_allow_origin(self):
506
        """
507
        Returns Access-Control-Allow-Origin of this TabPy service.
508
        """
509 1
        _cors_origin = ""
510 1
        try:
511 1
            logger.debug("Collecting Access-Control-Allow-Origin from state file ...")
512 1
            _cors_origin = self._get_config_value(
513
                "Service Info", "Access-Control-Allow-Origin"
514
            )
515
        except Exception as e:
516
            logger.error(e)
517 1
        return _cors_origin
518
519 1
    def get_access_control_allow_headers(self):
520
        """
521
        Returns Access-Control-Allow-Headers of this TabPy service.
522
        """
523 1
        _cors_headers = ""
524 1
        try:
525 1
            _cors_headers = self._get_config_value(
526
                "Service Info", "Access-Control-Allow-Headers"
527
            )
528
        except Exception:
529
            pass
530 1
        return _cors_headers
531
532 1
    def get_access_control_allow_methods(self):
533
        """
534
        Returns Access-Control-Allow-Methods of this TabPy service.
535
        """
536 1
        _cors_methods = ""
537 1
        try:
538 1
            _cors_methods = self._get_config_value(
539
                "Service Info", "Access-Control-Allow-Methods"
540
            )
541
        except Exception:
542
            pass
543 1
        return _cors_methods
544
545 1
    def _set_revision_number(self, revision_number):
546
        """
547
        Set the revision number of this TabPy service.
548
        """
549
        if not isinstance(revision_number, int):
550
            raise ValueError("revision number must be an int.")
551
        try:
552
            self._set_config_value(
553
                _META_SECTION_NAME, "Revision Number", revision_number
554
            )
555
        except Exception as e:
556
            logger.error(f"Unable to set revision number: {e}")
557
558 1
    def _remove_config_option(
559
        self,
560
        section_name,
561
        option_name,
562
        logger=logging.getLogger(__name__),
563
        _update_revision=True,
564
    ):
565
        if not self.config:
566
            raise ValueError("State configuration not yet loaded.")
567
        self.config.remove_option(section_name, option_name)
568
        # update revision number
569
        if _update_revision:
570
            self._increase_revision_number()
571
        self._write_state(logger=logger)
572
573 1
    def _has_config_value(self, section_name, option_name):
574
        if not self.config:
575
            raise ValueError("State configuration not yet loaded.")
576
        return self.config.has_option(section_name, option_name)
577
578 1
    def _increase_revision_number(self):
579
        if not self.config:
580
            raise ValueError("State configuration not yet loaded.")
581
        cur_rev = int(self.config.get(_META_SECTION_NAME, "Revision Number"))
582
        self.config.set(_META_SECTION_NAME, "Revision Number", str(cur_rev + 1))
583
584 1
    def _set_config_value(
585
        self,
586
        section_name,
587
        option_name,
588
        option_value,
589
        logger=logging.getLogger(__name__),
590
        _update_revision=True,
591
    ):
592
        if not self.config:
593
            raise ValueError("State configuration not yet loaded.")
594
595
        if not self.config.has_section(section_name):
596
            logger.log(logging.DEBUG, f"Adding config section {section_name}")
597
            self.config.add_section(section_name)
598
599
        self.config.set(section_name, option_name, option_value)
600
        # update revision number
601
        if _update_revision:
602
            self._increase_revision_number()
603
        self._write_state(logger=logger)
604
605 1
    def _get_config_items(self, section_name):
606
        if not self.config:
607
            raise ValueError("State configuration not yet loaded.")
608
        return self.config.items(section_name)
609
610 1
    def _get_config_value(
611
        self, section_name, option_name, optional=False, default_value=None
612
    ):
613 1
        logger.log(
614
            logging.DEBUG,
615
            f"Loading option '{option_name}' from section [{section_name}]...")
616
617 1
        if not self.config:
618
            msg = "State configuration not yet loaded."
619
            logging.log(msg)
620
            raise ValueError(msg)
621
622 1
        res = None
623 1
        if not option_name:
624 1
            res = self.config.options(section_name)
625 1
        elif self.config.has_option(section_name, option_name):
626 1
            res = self.config.get(section_name, option_name)
627
        elif optional:
628
            res = default_value
629
        else:
630
            raise ValueError(
631
                f"Cannot find option name {option_name} "
632
                f"under section {section_name}"
633
            )
634
635 1
        logger.log(logging.DEBUG, f"Returning value '{res}'")
636 1
        return res
637
638 1
    def _write_state(self, logger=logging.getLogger(__name__)):
639
        """
640
        Write state (ConfigParser) to Consul
641
        """
642
        logger.log(logging.INFO, "Writing state to config")
643
        write_state_config(self.config, self.settings, logger=logger)
644