Completed
Push — master ( eb5e78...c75f71 )
by Olivier
04:30
created

HistoryDict.new_historized_event()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.0625

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 5
ccs 3
cts 4
cp 0.75
crap 2.0625
rs 9.4285
c 0
b 0
f 0
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8 1
9
10
class UaNodeAlreadyHistorizedError(ua.UaError):
11 1
    pass
12
13
14
class HistoryStorageInterface(object):
15
16
    """
17
    Interface of a history backend.
18 1
    Must be implemented by backends
19
    """
20
21
    def new_historized_node(self, node_id, period, count=0):
22
        """
23
        Called when a new node is to be historized
24
        Returns None
25 1
        """
26
        raise NotImplementedError
27
28
    def save_node_value(self, node_id, datavalue):
29
        """
30
        Called when the value of a historized node has changed and should be saved in history
31
        Returns None
32 1
        """
33
        raise NotImplementedError
34
35
    def read_node_history(self, node_id, start, end, nb_values):
36
        """
37
        Called when a client make a history read request for a node
38
        if start or end is missing then nb_values is used to limit query
39
        nb_values is the max number of values to read. Ignored if 0
40
        Start time and end time are inclusive
41
        Returns a list of DataValues and a continuation point which
42
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
43 1
        """
44
        raise NotImplementedError
45
46
    def new_historized_event(self, source_id, evtypes, period, count=0):
47
        """
48
        Called when historization of events is enabled on server side
49
        Returns None
50
        """
51 1
        raise NotImplementedError
52
53
    def save_event(self, event):
54
        """
55
        Called when a new event has been generated ans should be saved in history
56
        Returns None
57
        """
58 1
        raise NotImplementedError
59
60
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
61
        """
62
        Called when a client make a history read request for events
63
        Start time and end time are inclusive
64
        Returns a list of Events and a continuation point which
65
        is None if all events are read or the ServerTimeStamp of the last rejected event
66
        """
67 1
        raise NotImplementedError
68
69
    def stop(self):
70
        """
71
        Called when the server shuts down
72
        Can be used to close database connections etc.
73
        """
74
        raise NotImplementedError
75 1
76
77
class HistoryDict(HistoryStorageInterface):
78
    """
79 1
    Very minimal history backend storing data in memory using a Python dictionary
80 1
    """
81 1
82 1
    def __init__(self):
83
        self._datachanges = {}
84 1
        self._datachanges_period = {}
85 1
        self._events = {}
86 1
        self._events_periods = {}
87
88 1
    def new_historized_node(self, node_id, period, count=0):
89 1
        if node_id in self._datachanges:
90 1
            raise UaNodeAlreadyHistorizedError(node_id)
91 1
        self._datachanges[node_id] = []
92 1
        self._datachanges_period[node_id] = period, count
93 1
94 1
    def save_node_value(self, node_id, datavalue):
95
        data = self._datachanges[node_id]
96 1
        period, count = self._datachanges_period[node_id]
97 1
        data.append(datavalue)
98
        now = datetime.utcnow()
99 1
        if period:
100 1
            while len(data) and now - data[0].ServerTimestamp > period:
101 1
                data.pop(0)
102
        if count and len(data) > count:
103
            data.pop(0)
104
105 1 View Code Duplication
    def read_node_history(self, node_id, start, end, nb_values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106
        cont = None
107 1
        if node_id not in self._datachanges:
108
            print("Error attempt to read history for a node which is not historized")
109 1
            return [], cont
110 1
        else:
111 1
            if start is None:
112 1
                start = ua.DateTimeMinValue
113 1
            if end is None:
114 1
                end = ua.DateTimeMinValue
115
            if start == ua.DateTimeMinValue:
116
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
117 1
            elif end == ua.DateTimeMinValue:
118 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
119 1
            elif start > end:
120 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
121 1
122
            else:
123 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
124 1
            if nb_values and len(results) > nb_values:
125
                cont = results[nb_values + 1].ServerTimestamp
126 1
                results = results[:nb_values]
127
            return results, cont
128
129 1
    def new_historized_event(self, source_id, evtypes, period, count=0):
130
        if source_id in self._events:
131
            raise UaNodeAlreadyHistorizedError(source_id)
132 1
        self._events[source_id] = []
133 1
        self._events_periods[source_id] = period, count
134
135
    def save_event(self, event):
136 1
        evts = self._events[event.SourceNode]
137 1
        evts.append(event)
138 1
        period, count = self._events_periods[event.SourceNode]
139
        now = datetime.utcnow()
140 1
        if period:
141 1
            while len(evts) and now - evts[0].ServerTimestamp > period:
142
                evts.pop(0)
143 1
        if count and len(evts) > count:
144 1
            evts.pop(0)
145
146 View Code Duplication
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147 1
        cont = None
148 1
        if source_id not in self._events:
149 1
            print("Error attempt to read event history for a node which does not historize events")
150 1
            return [], cont
151 1
        else:
152 1
            if start is None:
153 1
                start = ua.DateTimeMinValue
154
            if end is None:
155 1
                end = ua.DateTimeMinValue
156
            if start == ua.DateTimeMinValue:
157
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
158
            elif end == ua.DateTimeMinValue:
159 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
160
            elif start > end:
161 1
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
162 1
163 1
            else:
164 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
165 1
            if nb_values and len(results) > nb_values:
166 1
                cont = results[nb_values + 1].Time
167 1
                results = results[:nb_values]
168 1
            return results, cont
169 1
170
    def stop(self):
171 1
        pass
172
173
174
class SubHandler(object):
175 1
    def __init__(self, storage):
176 1
        self.storage = storage
177 1
178
    def datachange_notification(self, node, val, data):
179 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
180 1
181 1
    def event_notification(self, event):
182
        self.storage.save_event(event)
183 1
184
185
class HistoryManager(object):
186
    def __init__(self, iserver):
187 1
        self.logger = logging.getLogger(__name__)
188 1
        self.iserver = iserver
189 1
        self.storage = HistoryDict()
190
        self._sub = None
191
        self._handlers = {}
192
193 1
    def set_storage(self, storage):
194
        """
195 1
        set the desired HistoryStorageInterface which History Manager will use for historizing
196
        """
197 1
        self.storage = storage
198 1
199
    def _create_subscription(self, handler):
200 1
        params = ua.CreateSubscriptionParameters()
201
        params.RequestedPublishingInterval = 10
202
        params.RequestedLifetimeCount = 3000
203
        params.RequestedMaxKeepAliveCount = 10000
204 1
        params.MaxNotificationsPerPublish = 0
205 1
        params.PublishingEnabled = True
206 1
        params.Priority = 0
207
        return Subscription(self.iserver.isession, params, handler)
208
209
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
210 1
        """
211
        Subscribe to the nodes' data changes and store the data in the active storage.
212
        """
213
        if not self._sub:
214
            self._sub = self._create_subscription(SubHandler(self.storage))
215
        if node in self._handlers:
216 1
            raise ua.UaError("Node {} is already historized".format(node))
217
        self.storage.new_historized_node(node.nodeid, period, count)
218 1
        handler = self._sub.subscribe_data_change(node)
219 1
        self._handlers[node] = handler
220 1
221 1
    def historize_event(self, source, period=timedelta(days=7), count=0):
222
        """
223 1
        Subscribe to the source nodes' events and store the data in the active storage.
224
225
        SQL Implementation
226
        The default is to historize every event type the source generates, custom event properties are included. At
227 1
        this time there is no way to historize a specific event type. The user software can filter out events which are
228 1
        not desired when reading.
229 1
230
        Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
231
        time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
232
        must be deleted manually so that a new table with the custom event fields can be created.
233 1
        """
234 1
        if not self._sub:
235 1
            self._sub = self._create_subscription(SubHandler(self.storage))
236 1
        if source in self._handlers:
237
            raise ua.UaError("Events from {} are already historized".format(source))
238 1
239 1
        # get list of all event types that the source node generates; change this to only historize specific events
240
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
241
242 1
        self.storage.new_historized_event(source.nodeid, event_types, period, count)
243 1
244 1
        handler = self._sub.subscribe_events(source, event_types)
245
        self._handlers[source] = handler
246
247
    def dehistorize(self, node):
248
        """
249 1
        Remove subscription to the node/source which is being historized
250
251 1
        SQL Implementation
252 1
        Only the subscriptions is removed. The historical data remains.
253 1
        """
254
        if node in self._handlers:
255
            self._sub.unsubscribe(self._handlers[node])
256
            del(self._handlers[node])
257
        else:
258
            self.logger.error("History Manager isn't subscribed to %s", node)
259
260
    def read_history(self, params):
261 1
        """
262
        Read history for a node
263
        This is the part AttributeService, but implemented as its own service
264
        since it requires more logic than other attribute service methods
265 1
        """
266
        results = []
267 1
268
        for rv in params.NodesToRead:
269
            res = self._read_history(params.HistoryReadDetails, rv)
270 1
            results.append(res)
271
        return results
272 1
273 1
    def _read_history(self, details, rv):
274 1
        """
275
        determine if the history read is for a data changes or events; then read the history for that node
276
        """
277
        result = ua.HistoryReadResult()
278
        if isinstance(details, ua.ReadRawModifiedDetails):
279
            if details.IsReadModified:
280
                result.HistoryData = ua.HistoryModifiedData()
281
                # we do not support modified history by design so we return what we have
282 1
            else:
283
                result.HistoryData = ua.HistoryData()
284
            dv, cont = self._read_datavalue_history(rv, details)
285
            result.HistoryData.DataValues = dv
286
            result.ContinuationPoint = cont
287 1
288
        elif isinstance(details, ua.ReadEventDetails):
289 1
            result.HistoryData = ua.HistoryEvent()
290 1
            # FIXME: filter is a cumbersome type, maybe transform it something easier
291
            # to handle for storage
292 1
            ev, cont = self._read_event_history(rv, details)
293
            result.HistoryData.Events = ev
294 1
            result.ContinuationPoint = cont
295
296 1
        else:
297 1
            # we do not currently support the other types, clients can process data themselves
298 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
299
        return result
300 1
301 1
    def _read_datavalue_history(self, rv, details):
302 1
        starttime = details.StartTime
303 1
        if rv.ContinuationPoint:
304
            # Spec says we should ignore details if cont point is present
305 1
            # but they also say we can use cont point as timestamp to enable stateless
306
            # implementation. This is contradictory, so we assume details is
307
            # send correctly with continuation point
308
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
309
310
        dv, cont = self.storage.read_node_history(rv.NodeId,
311
                                                  starttime,
312
                                                  details.EndTime,
313
                                                  details.NumValuesPerNode)
314
        if cont:
315
            cont = ua.pack_datetime(cont)
316
        # rv.IndexRange
317
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
318
        return dv, cont
319 1
320
    def _read_event_history(self, rv, details):
321
        starttime = details.StartTime
322
        if rv.ContinuationPoint:
323 1
            # Spec says we should ignore details if cont point is present
324
            # but they also say we can use cont point as timestamp to enable stateless
325
            # implementation. This is contradictory, so we assume details is
326
            # send correctly with continuation point
327
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
328
329
        evts, cont = self.storage.read_event_history(rv.NodeId,
330
                                                     starttime,
331
                                                     details.EndTime,
332
                                                     details.NumValuesPerNode,
333
                                                     details.Filter)
334
        results = []
335
        for ev in evts:
336
            field_list = ua.HistoryEventFieldList()
337
            field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
338
            results.append(field_list)
339
        if cont:
340
            cont = ua.pack_datetime(cont)
341
        return results, cont
342
343
    def update_history(self, params):
344
        """
345
        Update history for a node
346
        This is the part AttributeService, but implemented as its own service
347
        since it requires more logic than other attribute service methods
348
        """
349
        results = []
350
        for _ in params.HistoryUpdateDetails:
351
            result = ua.HistoryUpdateResult()
352
            # we do not accept to rewrite history
353
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
354
            results.append(results)
355
        return results
356
357
    def stop(self):
358
        """
359
        call stop methods of active storage interface whenever the server is stopped
360
        """
361
        self.storage.stop()
362