Test Failed
Pull Request — master (#605)
by Olivier
03:57
created

HistoryDict   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 96
Duplicated Lines 47.92 %

Test Coverage

Coverage 83.54%

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 46
loc 96
ccs 66
cts 79
cp 0.8354
rs 7.9487
wmc 52

8 Methods

Rating   Name   Duplication   Size   Complexity  
A new_historized_node() 0 5 2
B save_event() 0 10 6
F read_event_history() 23 23 17
A stop() 0 2 1
A new_historized_event() 0 5 2
A __init__() 0 6 1
B save_node_value() 0 10 6
F read_node_history() 23 23 17

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like HistoryDict often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class UaNodeAlreadyHistorizedError(ua.UaError):
11 1
    pass
12
13
14 1
class HistoryStorageInterface(object):
15
16
    """
17
    Interface of a history backend.
18
    Must be implemented by backends
19
    """
20
21 1
    def new_historized_node(self, node_id, period, count=0):
22
        """
23
        Called when a new node is to be historized
24
        Returns None
25
        """
26
        raise NotImplementedError
27
28 1
    def save_node_value(self, node_id, datavalue):
29
        """
30
        Called when the value of a historized node has changed and should be saved in history
31
        Returns None
32
        """
33
        raise NotImplementedError
34
35 1
    def read_node_history(self, node_id, start, end, nb_values):
36
        """
37
        Called when a client make a history read request for a node
38
        if start or end is missing then nb_values is used to limit query
39
        nb_values is the max number of values to read. Ignored if 0
40
        Start time and end time are inclusive
41
        Returns a list of DataValues and a continuation point which
42
        is None if all nodes are read or the SourceTimeStamp of the last rejected DataValue
43
        """
44
        raise NotImplementedError
45
46 1
    def new_historized_event(self, source_id, evtypes, period, count=0):
47
        """
48
        Called when historization of events is enabled on server side
49
        Returns None
50
        """
51
        raise NotImplementedError
52
53 1
    def save_event(self, event):
54
        """
55
        Called when a new event has been generated ans should be saved in history
56
        Returns None
57
        """
58
        raise NotImplementedError
59
60 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
61
        """
62
        Called when a client make a history read request for events
63
        Start time and end time are inclusive
64
        Returns a list of Events and a continuation point which
65
        is None if all events are read or the SourceTimeStamp of the last rejected event
66
        """
67
        raise NotImplementedError
68
69 1
    def stop(self):
70
        """
71
        Called when the server shuts down
72
        Can be used to close database connections etc.
73
        """
74
        raise NotImplementedError
75
76
77 1
class HistoryDict(HistoryStorageInterface):
78
    """
79
    Very minimal history backend storing data in memory using a Python dictionary
80
    """
81
82 1
    def __init__(self):
83 1
        self._datachanges = {}
84 1
        self._datachanges_period = {}
85 1
        self._events = {}
86 1
        self._events_periods = {}
87 1
        self.logger = logging.getLogger(__name__)
88
89 1
    def new_historized_node(self, node_id, period, count=0):
90 1
        if node_id in self._datachanges:
91
            raise UaNodeAlreadyHistorizedError(node_id)
92 1
        self._datachanges[node_id] = []
93 1
        self._datachanges_period[node_id] = period, count
94
95 1
    def save_node_value(self, node_id, datavalue):
96 1
        data = self._datachanges[node_id]
97 1
        period, count = self._datachanges_period[node_id]
98 1
        data.append(datavalue)
99 1
        now = datetime.utcnow()
100 1
        if period:
101 1
            while len(data) and now - data[0].SourceTimestamp > period:
102
                data.pop(0)
103 1
        if count and len(data) > count:
104
            data.pop(0)
105
106 1 View Code Duplication
    def read_node_history(self, node_id, start, end, nb_values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
107 1
        cont = None
108 1
        if node_id not in self._datachanges:
109
            self.logger.warning("Error attempt to read history for a node which is not historized")
110
            return [], cont
111
        else:
112 1
            if start is None:
113 1
                start = ua.get_win_epoch()
114 1
            if end is None:
115 1
                end = ua.get_win_epoch()
116 1
            if start == ua.get_win_epoch():
117 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.SourceTimestamp]
118 1
            elif end == ua.get_win_epoch():
119 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.SourceTimestamp]
120 1
            elif start > end:
121 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.SourceTimestamp <= start]
122
123
            else:
124 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.SourceTimestamp <= end]
125 1
            if nb_values and len(results) > nb_values:
126 1
                cont = results[nb_values + 1].SourceTimestamp
127 1
                results = results[:nb_values]
128 1
            return results, cont
129
130 1
    def new_historized_event(self, source_id, evtypes, period, count=0):
131 1
        if source_id in self._events:
132
            raise UaNodeAlreadyHistorizedError(source_id)
133 1
        self._events[source_id] = []
134 1
        self._events_periods[source_id] = period, count
135
136 1
    def save_event(self, event):
137 1
        evts = self._events[event.SourceNode]
138 1
        evts.append(event)
139 1
        period, count = self._events_periods[event.SourceNode]
140 1
        now = datetime.utcnow()
141 1
        if period:
142
            while len(evts) and now - evts[0].SourceTimestamp > period:
143
                evts.pop(0)
144 1
        if count and len(evts) > count:
145
            evts.pop(0)
146
147 1 View Code Duplication
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
148 1
        cont = None
149 1
        if source_id not in self._events:
150
            print("Error attempt to read event history for a node which does not historize events")
151
            return [], cont
152
        else:
153 1
            if start is None:
154
                start = ua.get_win_epoch()
155 1
            if end is None:
156
                end = ua.get_win_epoch()
157 1
            if start == ua.get_win_epoch():
158 1
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
159 1
            elif end == ua.get_win_epoch():
160 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
161 1
            elif start > end:
162 1
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
163
164
            else:
165 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
166 1
            if nb_values and len(results) > nb_values:
167 1
                cont = results[nb_values + 1].Time
168 1
                results = results[:nb_values]
169 1
            return results, cont
170
171 1
    def stop(self):
172 1
        pass
173
174
175 1
class SubHandler(object):
176 1
    def __init__(self, storage):
177 1
        self.storage = storage
178
179 1
    def datachange_notification(self, node, val, data):
180 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
181
182 1
    def event_notification(self, event):
183 1
        self.storage.save_event(event)
184
185
186 1
class HistoryManager(object):
187 1
    def __init__(self, iserver):
188 1
        self.logger = logging.getLogger(__name__)
189 1
        self.iserver = iserver
190 1
        self.storage = HistoryDict()
191 1
        self._sub = None
192 1
        self._handlers = {}
193
194 1
    def set_storage(self, storage):
195
        """
196
        set the desired HistoryStorageInterface which History Manager will use for historizing
197
        """
198 1
        self.storage = storage
199
200 1
    def _create_subscription(self, handler):
201 1
        params = ua.CreateSubscriptionParameters()
202 1
        params.RequestedPublishingInterval = 10
203 1
        params.RequestedLifetimeCount = 3000
204 1
        params.RequestedMaxKeepAliveCount = 10000
205 1
        params.MaxNotificationsPerPublish = 0
206 1
        params.PublishingEnabled = True
207 1
        params.Priority = 0
208 1
        return Subscription(self.iserver.isession, params, handler)
209
210 1
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
211
        """
212
        Subscribe to the nodes' data changes and store the data in the active storage.
213
        """
214 1
        if not self._sub:
215 1
            self._sub = self._create_subscription(SubHandler(self.storage))
216 1
        if node in self._handlers:
217
            raise ua.UaError("Node {0} is already historized".format(node))
218 1
        self.storage.new_historized_node(node.nodeid, period, count)
219 1
        handler = self._sub.subscribe_data_change(node)
220 1
        self._handlers[node] = handler
221
222 1
    def historize_event(self, source, period=timedelta(days=7), count=0):
223
        """
224
        Subscribe to the source nodes' events and store the data in the active storage.
225
226
        SQL Implementation
227
        The default is to historize every event type the source generates, custom event properties are included. At
228
        this time there is no way to historize a specific event type. The user software can filter out events which are
229
        not desired when reading.
230
231
        Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
232
        time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
233
        must be deleted manually so that a new table with the custom event fields can be created.
234
        """
235 1
        if not self._sub:
236 1
            self._sub = self._create_subscription(SubHandler(self.storage))
237 1
        if source in self._handlers:
238
            raise ua.UaError("Events from {0} are already historized".format(source))
239
240
        # get list of all event types that the source node generates; change this to only historize specific events
241 1
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
242
243 1
        self.storage.new_historized_event(source.nodeid, event_types, period, count)
244
245 1
        handler = self._sub.subscribe_events(source, event_types)
246 1
        self._handlers[source] = handler
247
248 1
    def dehistorize(self, node):
249
        """
250
        Remove subscription to the node/source which is being historized
251
252
        SQL Implementation
253
        Only the subscriptions is removed. The historical data remains.
254
        """
255 1
        if node in self._handlers:
256 1
            self._sub.unsubscribe(self._handlers[node])
257 1
            del(self._handlers[node])
258
        else:
259
            self.logger.error("History Manager isn't subscribed to %s", node)
260
261 1
    def read_history(self, params):
262
        """
263
        Read history for a node
264
        This is the part AttributeService, but implemented as its own service
265
        since it requires more logic than other attribute service methods
266
        """
267 1
        results = []
268
269 1
        for rv in params.NodesToRead:
270 1
            res = self._read_history(params.HistoryReadDetails, rv)
271 1
            results.append(res)
272 1
        return results
273
274 1
    def _read_history(self, details, rv):
275
        """
276
        determine if the history read is for a data changes or events; then read the history for that node
277
        """
278 1
        result = ua.HistoryReadResult()
279 1
        if isinstance(details, ua.ReadRawModifiedDetails):
280 1
            if details.IsReadModified:
281
                result.HistoryData = ua.HistoryModifiedData()
282
                # we do not support modified history by design so we return what we have
283
            else:
284 1
                result.HistoryData = ua.HistoryData()
285 1
            dv, cont = self._read_datavalue_history(rv, details)
286 1
            result.HistoryData.DataValues = dv
287 1
            result.ContinuationPoint = cont
288
289 1
        elif isinstance(details, ua.ReadEventDetails):
290 1
            result.HistoryData = ua.HistoryEvent()
291
            # FIXME: filter is a cumbersome type, maybe transform it something easier
292
            # to handle for storage
293 1
            ev, cont = self._read_event_history(rv, details)
294 1
            result.HistoryData.Events = ev
295 1
            result.ContinuationPoint = cont
296
297
        else:
298
            # we do not currently support the other types, clients can process data themselves
299
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
300 1
        return result
301
302 1
    def _read_datavalue_history(self, rv, details):
303 1
        starttime = details.StartTime
304 1
        if rv.ContinuationPoint:
305
            # Spec says we should ignore details if cont point is present
306
            # but they also say we can use cont point as timestamp to enable stateless
307
            # implementation. This is contradictory, so we assume details is
308
            # send correctly with continuation point
309
            starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
310
311 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
312
                                                  starttime,
313
                                                  details.EndTime,
314
                                                  details.NumValuesPerNode)
315 1
        if cont:
316 1
            cont = ua.ua_binary.Primitives.DateTime.pack(cont)
317
        # rv.IndexRange
318
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
319 1
        return dv, cont
320
321 1
    def _read_event_history(self, rv, details):
322 1
        starttime = details.StartTime
323 1
        if rv.ContinuationPoint:
324
            # Spec says we should ignore details if cont point is present
325
            # but they also say we can use cont point as timestamp to enable stateless
326
            # implementation. This is contradictory, so we assume details is
327
            # send correctly with continuation point
328
            starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
329
330 1
        evts, cont = self.storage.read_event_history(rv.NodeId,
331
                                                     starttime,
332
                                                     details.EndTime,
333
                                                     details.NumValuesPerNode,
334
                                                     details.Filter)
335 1
        results = []
336 1
        for ev in evts:
337 1
            field_list = ua.HistoryEventFieldList()
338 1
            field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
339 1
            results.append(field_list)
340 1
        if cont:
341 1
            cont = ua.ua_binary.Primitives.DateTime.pack(cont)
342 1
        return results, cont
343
344 1
    def update_history(self, params):
345
        """
346
        Update history for a node
347
        This is the part AttributeService, but implemented as its own service
348
        since it requires more logic than other attribute service methods
349
        """
350
        results = []
351
        for _ in params.HistoryUpdateDetails:
352
            result = ua.HistoryUpdateResult()
353
            # we do not accept to rewrite history
354
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
355
            results.append(results)
356
        return results
357
358 1
    def stop(self):
359
        """
360
        call stop methods of active storage interface whenever the server is stopped
361
        """
362
        self.storage.stop()
363