Passed
Pull Request — master (#644)
by
unknown
21:04 queued 04:15
created

tabpy.tabpy_server.common.messages.Msg.to_json()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
ccs 2
cts 3
cp 0.6667
rs 10
c 0
b 0
f 0
cc 1
nop 1
crap 1.037
1 1
import abc
2 1
from abc import ABCMeta
3 1
from collections import namedtuple
4 1
import json
5
6
7 1
class Msg:
8
    """
9
    An abstract base class for all messages used for communicating between
10
    the WebServices.
11
12
    The minimal functionality is the ability to instantiate a Msg from JSON
13
    and to write a Msg instance to JSON.
14
15
    We use namedtuples because they are lightweight and immutable. The splat
16
    operator (*) that we inherit from namedtuple is also convenient. We empty
17
    __slots__ to avoid unnecessary overhead.
18
    """
19
20 1
    __metaclass__ = ABCMeta
21
22 1
    @abc.abstractmethod
23 1
    def for_json(self):
24
        d = self._asdict()
25
        type_str = self.__class__.__name__
26
        d.update({"type": type_str})
27
        return d
28
29 1
    @abc.abstractmethod
30 1
    def to_json(self):
31
        return json.dumps(self.for_json())
32
33 1
    @staticmethod
34 1
    def from_json(str):
35
        d = json.loads(str)
36
        type_str = d["type"]
37
        del d["type"]
38
        return eval(type_str)(**d)
39
40
41 1
class LoadSuccessful(
42
    namedtuple(
43
        "LoadSuccessful", ["uri", "path", "version", "is_update", "endpoint_type"]
44
    ),
45
    Msg,
46
):
47 1
    __slots__ = ()
48
49
50 1
class LoadFailed(namedtuple("LoadFailed", ["uri", "version", "error_msg"]), Msg):
51 1
    __slots__ = ()
52
53
54 1
class LoadInProgress(
55
    namedtuple(
56
        "LoadInProgress", ["uri", "path", "version", "is_update", "endpoint_type"]
57
    ),
58
    Msg,
59
):
60 1
    __slots__ = ()
61
62
63 1
class Query(namedtuple("Query", ["uri", "params"]), Msg):
64 1
    __slots__ = ()
65
66
67 1
class QuerySuccessful(
68
    namedtuple("QuerySuccessful", ["uri", "version", "response"]), Msg
69
):
70 1
    __slots__ = ()
71
72
73 1
class LoadObject(
74
    namedtuple("LoadObject", ["uri", "url", "version", "is_update", "endpoint_type"]),
75
    Msg,
76
):
77 1
    __slots__ = ()
78
79
80 1
class DeleteObjects(namedtuple("DeleteObjects", ["uris"]), Msg):
81 1
    __slots__ = ()
82
83
84
# Used for testing to flush out objects
85 1
class FlushObjects(namedtuple("FlushObjects", []), Msg):
86 1
    __slots__ = ()
87
88
89 1
class ObjectsDeleted(namedtuple("ObjectsDeleted", ["uris"]), Msg):
90 1
    __slots__ = ()
91
92
93 1
class ObjectsFlushed(namedtuple("ObjectsFlushed", ["n_before", "n_after"]), Msg):
94 1
    __slots__ = ()
95
96
97 1
class CountObjects(namedtuple("CountObjects", []), Msg):
98 1
    __slots__ = ()
99
100
101 1
class ObjectCount(namedtuple("ObjectCount", ["count"]), Msg):
102 1
    __slots__ = ()
103
104
105 1
class ListObjects(namedtuple("ListObjects", []), Msg):
106 1
    __slots__ = ()
107
108
109 1
class ObjectList(namedtuple("ObjectList", ["objects"]), Msg):
110 1
    __slots__ = ()
111
112
113 1
class UnknownURI(namedtuple("UnknownURI", ["uri"]), Msg):
114 1
    __slots__ = ()
115
116
117 1
class UnknownMessage(namedtuple("UnknownMessage", ["msg"]), Msg):
118 1
    __slots__ = ()
119
120
121 1
class DownloadSkipped(
122
    namedtuple("DownloadSkipped", ["uri", "version", "msg", "host"]), Msg
123
):
124 1
    __slots__ = ()
125
126
127 1
class QueryFailed(namedtuple("QueryFailed", ["uri", "error"]), Msg):
128 1
    __slots__ = ()
129
130
131 1
class QueryError(namedtuple("QueryError", ["uri", "error"]), Msg):
132 1
    __slots__ = ()
133
134
135 1
class CheckHealth(namedtuple("CheckHealth", []), Msg):
136 1
    __slots__ = ()
137
138
139 1
class Healthy(namedtuple("Healthy", []), Msg):
140 1
    __slots__ = ()
141
142
143 1
class Unhealthy(namedtuple("Unhealthy", []), Msg):
144 1
    __slots__ = ()
145
146
147 1
class Ping(namedtuple("Ping", ["id"]), Msg):
148 1
    __slots__ = ()
149
150
151 1
class Pong(namedtuple("Pong", ["id"]), Msg):
152 1
    __slots__ = ()
153
154
155 1
class Listening(namedtuple("Listening", []), Msg):
156 1
    __slots__ = ()
157
158
159 1
class EngineFailure(namedtuple("EngineFailure", ["error"]), Msg):
160 1
    __slots__ = ()
161
162
163 1
class FlushLogs(namedtuple("FlushLogs", []), Msg):
164 1
    __slots__ = ()
165
166
167 1
class LogsFlushed(namedtuple("LogsFlushed", []), Msg):
168 1
    __slots__ = ()
169
170
171 1
class ServiceError(namedtuple("ServiceError", ["error"]), Msg):
172
    __slots__ = ()
173