Completed
Pull Request — master (#509)
by
unknown
04:56
created

HistoryDict.new_historized_node()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 5
ccs 4
cts 5
cp 0.8
crap 2.032
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class UaNodeAlreadyHistorizedError(ua.UaError):
11 1
    pass
12
13
14 1
class HistoryStorageInterface(object):
15
16
    """
17
    Interface of a history backend.
18
    Must be implemented by backends
19
    """
20
21 1
    def new_historized_node(self, node_id, period, count=0):
22
        """
23
        Called when a new node is to be historized
24
        Returns None
25
        """
26
        raise NotImplementedError
27
28 1
    def save_node_value(self, node_id, datavalue):
29
        """
30
        Called when the value of a historized node has changed and should be saved in history
31
        Returns None
32
        """
33
        raise NotImplementedError
34
35 1
    def read_node_history(self, node_id, start, end, nb_values):
36
        """
37
        Called when a client make a history read request for a node
38
        if start or end is missing then nb_values is used to limit query
39
        nb_values is the max number of values to read. Ignored if 0
40
        Start time and end time are inclusive
41
        Returns a list of DataValues and a continuation point which
42
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
43
        """
44
        raise NotImplementedError
45
46 1
    def new_historized_event(self, source_id, evtypes, period, count=0):
47
        """
48
        Called when historization of events is enabled on server side
49
        Returns None
50
        """
51
        raise NotImplementedError
52
53 1
    def save_event(self, event):
54
        """
55
        Called when a new event has been generated ans should be saved in history
56
        Returns None
57
        """
58
        raise NotImplementedError
59
60 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
61
        """
62
        Called when a client make a history read request for events
63
        Start time and end time are inclusive
64
        Returns a list of Events and a continuation point which
65
        is None if all events are read or the ServerTimeStamp of the last rejected event
66
        """
67
        raise NotImplementedError
68
69 1
    def stop(self):
70
        """
71
        Called when the server shuts down
72
        Can be used to close database connections etc.
73
        """
74
        raise NotImplementedError
75
76
77 1
class HistoryDict(HistoryStorageInterface):
78
    """
79
    Very minimal history backend storing data in memory using a Python dictionary
80
    """
81
82 1
    def __init__(self):
83 1
        self._datachanges = {}
84 1
        self._datachanges_period = {}
85 1
        self._events = {}
86 1
        self._events_periods = {}
87
88 1
    def new_historized_node(self, node_id, period, count=0):
89 1
        if node_id in self._datachanges:
90
            raise UaNodeAlreadyHistorizedError(node_id)
91 1
        self._datachanges[node_id] = []
92 1
        self._datachanges_period[node_id] = period, count
93
94 1
    def save_node_value(self, node_id, datavalue):
95 1
        data = self._datachanges[node_id]
96 1
        period, count = self._datachanges_period[node_id]
97 1
        data.append(datavalue)
98 1
        now = datetime.utcnow()
99 1
        if period:
100 1
            while len(data) and now - data[0].ServerTimestamp > period:
101 1
                data.pop(0)
102 1
        if count and len(data) > count:
103 1
            data.pop(0)
104
105 1 View Code Duplication
    def read_node_history(self, node_id, start, end, nb_values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106 1
        cont = None
107 1
        if node_id not in self._datachanges:
108
            print("Error attempt to read history for a node which is not historized")
109
            return [], cont
110
        else:
111 1
            if start is None:
112 1
                start = ua.get_win_epoch()
113 1
            if end is None:
114 1
                end = ua.get_win_epoch()
115 1
            if start == ua.get_win_epoch():
116 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
117 1
            elif end == ua.get_win_epoch():
118 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
119 1
            elif start > end:
120 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
121
122
            else:
123 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
124 1
            if nb_values and len(results) > nb_values:
125 1
                cont = results[nb_values + 1].ServerTimestamp
126 1
                results = results[:nb_values]
127 1
            return results, cont
128
129 1
    def new_historized_event(self, source_id, evtypes, period, count=0):
130 1
        if source_id in self._events:
131
            raise UaNodeAlreadyHistorizedError(source_id)
132 1
        self._events[source_id] = []
133 1
        self._events_periods[source_id] = period, count
134
135 1
    def save_event(self, event):
136 1
        evts = self._events[event.SourceNode]
137
        evts.append(event)
138
        period, count = self._events_periods[event.SourceNode]
139
        now = datetime.utcnow()
140
        if period:
141
            while len(evts) and now - evts[0].ServerTimestamp > period:
142
                evts.pop(0)
143
        if count and len(evts) > count:
144
            evts.pop(0)
145
146 1 View Code Duplication
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147 1
        cont = None
148 1
        if source_id not in self._events:
149
            print("Error attempt to read event history for a node which does not historize events")
150
            return [], cont
151
        else:
152 1
            if start is None:
153
                start = ua.get_win_epoch()
154 1
            if end is None:
155
                end = ua.get_win_epoch()
156 1
            if start == ua.get_win_epoch():
157 1
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
158 1
            elif end == ua.get_win_epoch():
159 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
160 1
            elif start > end:
161 1
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
162
163
            else:
164 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
165 1
            if nb_values and len(results) > nb_values:
166
                cont = results[nb_values + 1].Time
167
                results = results[:nb_values]
168 1
            return results, cont
169
170 1
    def stop(self):
171 1
        pass
172
173
174 1
class SubHandler(object):
175 1
    def __init__(self, storage):
176 1
        self.storage = storage
177
178 1
    def datachange_notification(self, node, val, data):
179 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
180
181 1
    def event_notification(self, event):
182 1
        self.storage.save_event(event)
183
184
185 1
class HistoryManager(object):
186 1
    def __init__(self, iserver):
187 1
        self.logger = logging.getLogger(__name__)
188 1
        self.iserver = iserver
189 1
        self.storage = HistoryDict()
190 1
        self._sub = None
191 1
        self._handlers = {}
192
193 1
    def set_storage(self, storage):
194
        """
195
        set the desired HistoryStorageInterface which History Manager will use for historizing
196
        """
197 1
        self.storage = storage
198
199 1
    def _create_subscription(self, handler):
200 1
        params = ua.CreateSubscriptionParameters()
201 1
        params.RequestedPublishingInterval = 10
202 1
        params.RequestedLifetimeCount = 3000
203 1
        params.RequestedMaxKeepAliveCount = 10000
204 1
        params.MaxNotificationsPerPublish = 0
205 1
        params.PublishingEnabled = True
206 1
        params.Priority = 0
207 1
        return Subscription(self.iserver.isession, params, handler)
208
209 1
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
210
        """
211
        Subscribe to the nodes' data changes and store the data in the active storage.
212
        """
213 1
        if not self._sub:
214 1
            self._sub = self._create_subscription(SubHandler(self.storage))
215 1
        if node in self._handlers:
216
            raise ua.UaError("Node {0} is already historized".format(node))
217 1
        self.storage.new_historized_node(node.nodeid, period, count)
218 1
        handler = self._sub.subscribe_data_change(node)
219 1
        self._handlers[node] = handler
220
221 1
    def historize_event(self, source, period=timedelta(days=7), count=0):
222
        """
223
        Subscribe to the source nodes' events and store the data in the active storage.
224
225
        SQL Implementation
226
        The default is to historize every event type the source generates, custom event properties are included. At
227
        this time there is no way to historize a specific event type. The user software can filter out events which are
228
        not desired when reading.
229
230
        Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
231
        time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
232
        must be deleted manually so that a new table with the custom event fields can be created.
233
        """
234 1
        if not self._sub:
235 1
            self._sub = self._create_subscription(SubHandler(self.storage))
236 1
        if source in self._handlers:
237
            raise ua.UaError("Events from {0} are already historized".format(source))
238
239
        # get list of all event types that the source node generates; change this to only historize specific events
240 1
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
241
242 1
        self.storage.new_historized_event(source.nodeid, event_types, period, count)
243
244 1
        handler = self._sub.subscribe_events(source, event_types)
245 1
        self._handlers[source] = handler
246
247 1
    def dehistorize(self, node):
248
        """
249
        Remove subscription to the node/source which is being historized
250
251
        SQL Implementation
252
        Only the subscriptions is removed. The historical data remains.
253
        """
254 1
        if node in self._handlers:
255 1
            self._sub.unsubscribe(self._handlers[node])
256 1
            del(self._handlers[node])
257
        else:
258
            self.logger.error("History Manager isn't subscribed to %s", node)
259
260 1
    def read_history(self, params):
261
        """
262
        Read history for a node
263
        This is the part AttributeService, but implemented as its own service
264
        since it requires more logic than other attribute service methods
265
        """
266 1
        results = []
267
268 1
        for rv in params.NodesToRead:
269 1
            res = self._read_history(params.HistoryReadDetails, rv)
270 1
            results.append(res)
271 1
        return results
272
273 1
    def _read_history(self, details, rv):
274
        """
275
        determine if the history read is for a data changes or events; then read the history for that node
276
        """
277 1
        result = ua.HistoryReadResult()
278 1
        if isinstance(details, ua.ReadRawModifiedDetails):
279 1
            if details.IsReadModified:
280
                result.HistoryData = ua.HistoryModifiedData()
281
                # we do not support modified history by design so we return what we have
282
            else:
283 1
                result.HistoryData = ua.HistoryData()
284 1
            dv, cont = self._read_datavalue_history(rv, details)
285 1
            result.HistoryData.DataValues = dv
286 1
            result.ContinuationPoint = cont
287
288 1
        elif isinstance(details, ua.ReadEventDetails):
289 1
            result.HistoryData = ua.HistoryEvent()
290
            # FIXME: filter is a cumbersome type, maybe transform it something easier
291
            # to handle for storage
292 1
            ev, cont = self._read_event_history(rv, details)
293 1
            result.HistoryData.Events = ev
294 1
            result.ContinuationPoint = cont
295
296
        else:
297
            # we do not currently support the other types, clients can process data themselves
298
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
299 1
        return result
300
301 1
    def _read_datavalue_history(self, rv, details):
302 1
        starttime = details.StartTime
303 1
        if rv.ContinuationPoint:
304
            # Spec says we should ignore details if cont point is present
305
            # but they also say we can use cont point as timestamp to enable stateless
306
            # implementation. This is contradictory, so we assume details is
307
            # send correctly with continuation point
308
            starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
309
310 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
311
                                                  starttime,
312
                                                  details.EndTime,
313
                                                  details.NumValuesPerNode)
314 1
        if cont:
315 1
            cont = ua.ua_binary.Primitives.DateTime.pack(cont)
316
        # rv.IndexRange
317
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
318 1
        return dv, cont
319
320 1
    def _read_event_history(self, rv, details):
321 1
        starttime = details.StartTime
322 1
        if rv.ContinuationPoint:
323
            # Spec says we should ignore details if cont point is present
324
            # but they also say we can use cont point as timestamp to enable stateless
325
            # implementation. This is contradictory, so we assume details is
326
            # send correctly with continuation point
327
            starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
328
329 1
        evts, cont = self.storage.read_event_history(rv.NodeId,
330
                                                     starttime,
331
                                                     details.EndTime,
332
                                                     details.NumValuesPerNode,
333
                                                     details.Filter)
334 1
        results = []
335 1
        for ev in evts:
336
            field_list = ua.HistoryEventFieldList()
337
            field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
338
            results.append(field_list)
339 1
        if cont:
340
            cont = ua.ua_binary.Primitives.DateTime.pack(cont)
341 1
        return results, cont
342
343 1
    def update_history(self, params):
344
        """
345
        Update history for a node
346
        This is the part AttributeService, but implemented as its own service
347
        since it requires more logic than other attribute service methods
348
        """
349
        results = []
350
        for _ in params.HistoryUpdateDetails:
351
            result = ua.HistoryUpdateResult()
352
            # we do not accept to rewrite history
353
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
354
            results.append(results)
355
        return results
356
357 1
    def stop(self):
358
        """
359
        call stop methods of active storage interface whenever the server is stopped
360
        """
361
        self.storage.stop()
362