Test Failed
Pull Request — master (#490)
by Olivier
06:42
created

UaNodeAlreadyHistorizedError

Complexity

Total Complexity 0

Size/Duplication

Total Lines 2
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 2
ccs 0
cts 2
cp 0
wmc 0
1
import logging
2
from datetime import timedelta
3
from datetime import datetime
4
5
from opcua import Subscription
6
from opcua import ua
7
from opcua.common import utils
8
9
10
class UaNodeAlreadyHistorizedError(ua.UaError):
11
    pass
12
13
14
class HistoryStorageInterface(object):
15
16
    """
17
    Interface of a history backend.
18
    Must be implemented by backends
19
    """
20
21
    def new_historized_node(self, node_id, period, count=0):
22
        """
23
        Called when a new node is to be historized
24
        Returns None
25
        """
26
        raise NotImplementedError
27
28
    def save_node_value(self, node_id, datavalue):
29
        """
30
        Called when the value of a historized node has changed and should be saved in history
31
        Returns None
32
        """
33
        raise NotImplementedError
34
35
    def read_node_history(self, node_id, start, end, nb_values):
36
        """
37
        Called when a client make a history read request for a node
38
        if start or end is missing then nb_values is used to limit query
39
        nb_values is the max number of values to read. Ignored if 0
40
        Start time and end time are inclusive
41
        Returns a list of DataValues and a continuation point which
42
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
43
        """
44
        raise NotImplementedError
45
46
    def new_historized_event(self, source_id, evtypes, period, count=0):
47
        """
48
        Called when historization of events is enabled on server side
49
        Returns None
50
        """
51
        raise NotImplementedError
52
53
    def save_event(self, event):
54
        """
55
        Called when a new event has been generated ans should be saved in history
56
        Returns None
57
        """
58
        raise NotImplementedError
59
60
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
61
        """
62
        Called when a client make a history read request for events
63
        Start time and end time are inclusive
64
        Returns a list of Events and a continuation point which
65
        is None if all events are read or the ServerTimeStamp of the last rejected event
66
        """
67
        raise NotImplementedError
68
69
    def stop(self):
70
        """
71
        Called when the server shuts down
72
        Can be used to close database connections etc.
73
        """
74
        raise NotImplementedError
75
76
77
class HistoryDict(HistoryStorageInterface):
78
    """
79
    Very minimal history backend storing data in memory using a Python dictionary
80
    """
81
82
    def __init__(self):
83
        self._datachanges = {}
84
        self._datachanges_period = {}
85
        self._events = {}
86
        self._events_periods = {}
87
88
    def new_historized_node(self, node_id, period, count=0):
89
        if node_id in self._datachanges:
90
            raise UaNodeAlreadyHistorizedError(node_id)
91
        self._datachanges[node_id] = []
92
        self._datachanges_period[node_id] = period, count
93
94
    def save_node_value(self, node_id, datavalue):
95
        data = self._datachanges[node_id]
96
        period, count = self._datachanges_period[node_id]
97
        data.append(datavalue)
98
        now = datetime.utcnow()
99
        if period:
100
            while len(data) and now - data[0].ServerTimestamp > period:
101
                data.pop(0)
102
        if count and len(data) > count:
103
            data.pop(0)
104
105 View Code Duplication
    def read_node_history(self, node_id, start, end, nb_values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106
        cont = None
107
        if node_id not in self._datachanges:
108
            print("Error attempt to read history for a node which is not historized")
109
            return [], cont
110
        else:
111
            if start is None:
112
                start = ua.get_win_epoch()
113
            if end is None:
114
                end = ua.get_win_epoch()
115
            if start == ua.get_win_epoch():
116
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
117
            elif end == ua.get_win_epoch():
118
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
119
            elif start > end:
120
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
121
122
            else:
123
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
124
            if nb_values and len(results) > nb_values:
125
                cont = results[nb_values + 1].ServerTimestamp
126
                results = results[:nb_values]
127
            return results, cont
128
129
    def new_historized_event(self, source_id, evtypes, period, count=0):
130
        if source_id in self._events:
131
            raise UaNodeAlreadyHistorizedError(source_id)
132
        self._events[source_id] = []
133
        self._events_periods[source_id] = period, count
134
135
    def save_event(self, event):
136
        evts = self._events[event.SourceNode]
137
        evts.append(event)
138
        period, count = self._events_periods[event.SourceNode]
139
        now = datetime.utcnow()
140
        if period:
141
            while len(evts) and now - evts[0].ServerTimestamp > period:
142
                evts.pop(0)
143
        if count and len(evts) > count:
144
            evts.pop(0)
145
146 View Code Duplication
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147
        cont = None
148
        if source_id not in self._events:
149
            print("Error attempt to read event history for a node which does not historize events")
150
            return [], cont
151
        else:
152
            if start is None:
153
                start = ua.get_win_epoch()
154
            if end is None:
155
                end = ua.get_win_epoch()
156
            if start == ua.get_win_epoch():
157
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
158
            elif end == ua.get_win_epoch():
159
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
160
            elif start > end:
161
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
162
163
            else:
164
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
165
            if nb_values and len(results) > nb_values:
166
                cont = results[nb_values + 1].Time
167
                results = results[:nb_values]
168
            return results, cont
169
170
    def stop(self):
171
        pass
172
173
174
class SubHandler(object):
175
    def __init__(self, storage):
176
        self.storage = storage
177
178
    def datachange_notification(self, node, val, data):
179
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
180
181
    def event_notification(self, event):
182
        self.storage.save_event(event)
183
184
185
class HistoryManager(object):
186
    def __init__(self, iserver):
187
        self.logger = logging.getLogger(__name__)
188
        self.iserver = iserver
189
        self.storage = HistoryDict()
190
        self._sub = None
191
        self._handlers = {}
192
193
    def set_storage(self, storage):
194
        """
195
        set the desired HistoryStorageInterface which History Manager will use for historizing
196
        """
197
        self.storage = storage
198
199
    def _create_subscription(self, handler):
200
        params = ua.CreateSubscriptionParameters()
201
        params.RequestedPublishingInterval = 10
202
        params.RequestedLifetimeCount = 3000
203
        params.RequestedMaxKeepAliveCount = 10000
204
        params.MaxNotificationsPerPublish = 0
205
        params.PublishingEnabled = True
206
        params.Priority = 0
207
        return Subscription(self.iserver.isession, params, handler)
208
209
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
210
        """
211
        Subscribe to the nodes' data changes and store the data in the active storage.
212
        """
213
        if not self._sub:
214
            self._sub = self._create_subscription(SubHandler(self.storage))
215
        if node in self._handlers:
216
            raise ua.UaError("Node {0} is already historized".format(node))
217
        self.storage.new_historized_node(node.nodeid, period, count)
218
        handler = self._sub.subscribe_data_change(node)
219
        self._handlers[node] = handler
220
221
    def historize_event(self, source, period=timedelta(days=7), count=0):
222
        """
223
        Subscribe to the source nodes' events and store the data in the active storage.
224
225
        SQL Implementation
226
        The default is to historize every event type the source generates, custom event properties are included. At
227
        this time there is no way to historize a specific event type. The user software can filter out events which are
228
        not desired when reading.
229
230
        Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
231
        time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
232
        must be deleted manually so that a new table with the custom event fields can be created.
233
        """
234
        if not self._sub:
235
            self._sub = self._create_subscription(SubHandler(self.storage))
236
        if source in self._handlers:
237
            raise ua.UaError("Events from {0} are already historized".format(source))
238
239
        # get list of all event types that the source node generates; change this to only historize specific events
240
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
241
242
        self.storage.new_historized_event(source.nodeid, event_types, period, count)
243
244
        handler = self._sub.subscribe_events(source, event_types)
245
        self._handlers[source] = handler
246
247
    def dehistorize(self, node):
248
        """
249
        Remove subscription to the node/source which is being historized
250
251
        SQL Implementation
252
        Only the subscriptions is removed. The historical data remains.
253
        """
254
        if node in self._handlers:
255
            self._sub.unsubscribe(self._handlers[node])
256
            del(self._handlers[node])
257
        else:
258
            self.logger.error("History Manager isn't subscribed to %s", node)
259
260
    def read_history(self, params):
261
        """
262
        Read history for a node
263
        This is the part AttributeService, but implemented as its own service
264
        since it requires more logic than other attribute service methods
265
        """
266
        results = []
267
268
        for rv in params.NodesToRead:
269
            res = self._read_history(params.HistoryReadDetails, rv)
270
            results.append(res)
271
        return results
272
273
    def _read_history(self, details, rv):
274
        """
275
        determine if the history read is for a data changes or events; then read the history for that node
276
        """
277
        result = ua.HistoryReadResult()
278
        if isinstance(details, ua.ReadRawModifiedDetails):
279
            if details.IsReadModified:
280
                result.HistoryData = ua.HistoryModifiedData()
281
                # we do not support modified history by design so we return what we have
282
            else:
283
                result.HistoryData = ua.HistoryData()
284
            dv, cont = self._read_datavalue_history(rv, details)
285
            result.HistoryData.DataValues = dv
286
            result.ContinuationPoint = cont
287
288
        elif isinstance(details, ua.ReadEventDetails):
289
            result.HistoryData = ua.HistoryEvent()
290
            # FIXME: filter is a cumbersome type, maybe transform it something easier
291
            # to handle for storage
292
            ev, cont = self._read_event_history(rv, details)
293
            result.HistoryData.Events = ev
294
            result.ContinuationPoint = cont
295
296
        else:
297
            # we do not currently support the other types, clients can process data themselves
298
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
299
        return result
300
301
    def _read_datavalue_history(self, rv, details):
302
        starttime = details.StartTime
303
        if rv.ContinuationPoint:
304
            # Spec says we should ignore details if cont point is present
305
            # but they also say we can use cont point as timestamp to enable stateless
306
            # implementation. This is contradictory, so we assume details is
307
            # send correctly with continuation point
308
            starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
309
310
        dv, cont = self.storage.read_node_history(rv.NodeId,
311
                                                  starttime,
312
                                                  details.EndTime,
313
                                                  details.NumValuesPerNode)
314
        if cont:
315
            cont = ua.ua_binary.Primitives.DateTime.pack(cont)
316
        # rv.IndexRange
317
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
318
        return dv, cont
319
320
    def _read_event_history(self, rv, details):
321
        starttime = details.StartTime
322
        if rv.ContinuationPoint:
323
            # Spec says we should ignore details if cont point is present
324
            # but they also say we can use cont point as timestamp to enable stateless
325
            # implementation. This is contradictory, so we assume details is
326
            # send correctly with continuation point
327
            starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
328
329
        evts, cont = self.storage.read_event_history(rv.NodeId,
330
                                                     starttime,
331
                                                     details.EndTime,
332
                                                     details.NumValuesPerNode,
333
                                                     details.Filter)
334
        results = []
335
        for ev in evts:
336
            field_list = ua.HistoryEventFieldList()
337
            field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
338
            results.append(field_list)
339
        if cont:
340
            cont = ua.ua_binary.Primitives.DateTime.pack(cont)
341
        return results, cont
342
343
    def update_history(self, params):
344
        """
345
        Update history for a node
346
        This is the part AttributeService, but implemented as its own service
347
        since it requires more logic than other attribute service methods
348
        """
349
        results = []
350
        for _ in params.HistoryUpdateDetails:
351
            result = ua.HistoryUpdateResult()
352
            # we do not accept to rewrite history
353
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
354
            results.append(results)
355
        return results
356
357
    def stop(self):
358
        """
359
        call stop methods of active storage interface whenever the server is stopped
360
        """
361
        self.storage.stop()
362