Completed
Pull Request — master (#204)
by Olivier
04:05
created

HistoryManager.historize_data_change()   A

Complexity

Conditions 3

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 11
ccs 4
cts 4
cp 1
crap 3
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8 1
from opcua.common import events
9
10
11 1
class UaNodeAlreadyHistorizedError(ua.UaError):
12
    pass
13
14
15
class HistoryStorageInterface(object):
16
17
    """
18 1
    Interface of a history backend.
19
    Must be implemented by backends
20
    """
21
22
    def new_historized_node(self, node_id, period, count=0):
23
        """
24
        Called when a new node is to be historized
25 1
        Returns None
26
        """
27
        raise NotImplementedError
28
29
    def save_node_value(self, node_id, datavalue):
30
        """
31
        Called when the value of a historized node has changed and should be saved in history
32 1
        Returns None
33
        """
34
        raise NotImplementedError
35
36
    def read_node_history(self, node_id, start, end, nb_values):
37
        """
38
        Called when a client make a history read request for a node
39
        if start or end is missing then nb_values is used to limit query
40
        nb_values is the max number of values to read. Ignored if 0
41
        Start time and end time are inclusive
42
        Returns a list of DataValues and a continuation point which
43 1
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
44
        """
45
        raise NotImplementedError
46
47
    def new_historized_event(self, source_id, etype, period, count=0):
48
        """
49
        Called when historization of events is enabled on server side
50
        Returns None
51 1
        """
52
        raise NotImplementedError
53
54
    def save_event(self, event):
55
        """
56
        Called when a new event has been generated ans should be saved in history
57
        Returns None
58 1
        """
59
        raise NotImplementedError
60
61
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
62
        """
63
        Called when a client make a history read request for events
64
        Start time and end time are inclusive
65
        Returns a list of Events and a continuation point which
66
        is None if all events are read or the ServerTimeStamp of the last rejected event
67 1
        """
68
        raise NotImplementedError
69
70
    def stop(self):
71
        """
72
        Called when the server shuts down
73
        Can be used to close database connections etc.
74
        """
75 1
        raise NotImplementedError
76
77
78
class HistoryDict(HistoryStorageInterface):
79 1
    """
80 1
    very minimal history back end storing data in memory using a Python dictionary
81 1
    """
82 1
    def __init__(self):
83
        self._datachanges = {}
84 1
        self._datachanges_period = {}
85 1
        self._events = {}
86 1
        self._events_periods = {}
87
88 1
    def new_historized_node(self, node_id, period, count=0):
89 1
        if node_id in self._datachanges:
90 1
            raise UaNodeAlreadyHistorizedError(node_id)
91 1
        self._datachanges[node_id] = []
92 1
        self._datachanges_period[node_id] = period, count
93 1
94 1
    def save_node_value(self, node_id, datavalue):
95
        data = self._datachanges[node_id]
96 1
        period, count = self._datachanges_period[node_id]
97 1
        data.append(datavalue)
98
        now = datetime.utcnow()
99 1
        if period:
100 1
            while len(data) and now - data[0].ServerTimestamp > period:
101 1
                data.pop(0)
102
        if count and len(data) > count:
103
            data.pop(0)
104
105 1 View Code Duplication
    def read_node_history(self, node_id, start, end, nb_values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106
        cont = None
107 1
        if node_id not in self._datachanges:
108
            print("Error attempt to read history for a node which is not historized")
109 1
            return [], cont
110 1
        else:
111 1
            if start is None:
112 1
                start = ua.DateTimeMinValue
113 1
            if end is None:
114 1
                end = ua.DateTimeMinValue
115
            if start == ua.DateTimeMinValue:
116
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
117 1
            elif end == ua.DateTimeMinValue:
118 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
119 1
            elif start > end:
120 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
121 1
122
            else:
123 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
124 1
            if nb_values and len(results) > nb_values:
125
                cont = results[nb_values + 1].ServerTimestamp
126 1
                results = results[:nb_values]
127
            return results, cont
128
129 1
    def new_historized_event(self, source_id, etype, period, count=0):
130
        if source_id in self._events:
131
            raise UaNodeAlreadyHistorizedError(source_id)
132 1
        self._events[source_id] = []
133 1
        self._events_periods[source_id] = period, count
134
135
    def save_event(self, event):
136 1
        evts = self._events[event.SourceNode]
137 1
        evts.append(event)
138 1
        period, count = self._events_periods[event.SourceNode]
139
        now = datetime.utcnow()
140 1
        if period:
141 1
            while len(evts) and now - evts[0].ServerTimestamp > period:
142
                evts.pop(0)
143 1
        if count and len(evts) > count:
144 1
            evts.pop(0)
145
146 View Code Duplication
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147 1
        cont = None
148 1
        if source_id not in self._events:
149 1
            print("Error attempt to read event history for a node which does not historize events")
150 1
            return [], cont
151 1
        else:
152 1
            if start is None:
153 1
                start = ua.DateTimeMinValue
154
            if end is None:
155 1
                end = ua.DateTimeMinValue
156
            if start == ua.DateTimeMinValue:
157
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
158
            elif end == ua.DateTimeMinValue:
159 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
160
            elif start > end:
161 1
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
162 1
163 1
            else:
164 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
165 1
            if nb_values and len(results) > nb_values:
166 1
                cont = results[nb_values + 1].Time
167 1
                results = results[:nb_values]
168 1
            return results, cont
169 1
170
    def stop(self):
171 1
        pass
172
173
174
class SubHandler(object):
175 1
    def __init__(self, storage):
176 1
        self.storage = storage
177 1
178
    def datachange_notification(self, node, val, data):
179 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
180 1
181 1
    def event_notification(self, event):
182
        self.storage.save_event(event)
183 1
184
185
class HistoryManager(object):
186
    def __init__(self, iserver):
187 1
        self.logger = logging.getLogger(__name__)
188 1
        self.iserver = iserver
189 1
        self.storage = HistoryDict()
190
        self._sub = None
191
        self._handlers = {}
192
193 1
    def set_storage(self, storage):
194
        """
195 1
        set the desired HistoryStorageInterface which History Manager will use for historizing
196
        """
197 1
        self.storage = storage
198 1
199
    def _create_subscription(self, handler):
200 1
        params = ua.CreateSubscriptionParameters()
201
        params.RequestedPublishingInterval = 10
202
        params.RequestedLifetimeCount = 3000
203
        params.RequestedMaxKeepAliveCount = 10000
204 1
        params.MaxNotificationsPerPublish = 0
205 1
        params.PublishingEnabled = True
206 1
        params.Priority = 0
207
        return Subscription(self.iserver.isession, params, handler)
208
209
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
210 1
        """
211
        Subscribe to the nodes' data changes and store the data in the active storage.
212
        """
213
        if not self._sub:
214
            self._sub = self._create_subscription(SubHandler(self.storage))
215
        if node in self._handlers:
216 1
            raise ua.UaError("Node {} is already historized".format(node))
217
        self.storage.new_historized_node(node.nodeid, period, count)
218 1
        handler = self._sub.subscribe_data_change(node)
219 1
        self._handlers[node] = handler
220 1
221 1
    def historize_event(self, source, period=timedelta(days=7), count=0):
222
        """
223 1
        Subscribe to the source nodes' events and store the data in the active storage.
224
225
        SQL Implementation
226
        The default is to historize every event type the source generates, custom event properties are included. At
227 1
        this time there is no way to historize a specific event type. The user software can filter out events which are
228 1
        not desired when reading.
229 1
230
        Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
231
        time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
232
        must be deleted manually so that a new table with the custom event fields can be created.
233 1
        """
234 1
        if not self._sub:
235 1
            self._sub = self._create_subscription(SubHandler(self.storage))
236 1
        if source in self._handlers:
237
            raise ua.UaError("Events from {} are already historized".format(source))
238 1
239 1
        # get the event types the source node generates and a list of all possible event fields
240
        event_types, ev_fields = self._get_source_event_data(source)
241
242 1
        # FIXME passing ev_fields instead of event type only works because HistoryDict doesn't use this parameter,
243 1
        # FIXME SQL needs to be fixed to get the fields in the SQL module, not here; only event types should be here
244 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period, count)
245
246
        handler = self._sub.subscribe_events(source, event_types)
247
        self._handlers[source] = handler
248
249 1
    def dehistorize(self, node):
250
        """
251 1
        Remove subscription to the node/source which is being historized
252 1
253 1
        SQL Implementation
254
        Only the subscriptions is removed. The historical data remains.
255
        """
256
        if node in self._handlers:
257
            self._sub.unsubscribe(self._handlers[node])
258
            del(self._handlers[node])
259
        else:
260
            self.logger.error("History Manager isn't subscribed to %s", node)
261 1
262
    def read_history(self, params):
263
        """
264
        Read history for a node
265 1
        This is the part AttributeService, but implemented as its own service
266
        since it requires more logic than other attribute service methods
267 1
        """
268
        results = []
269
270 1
        for rv in params.NodesToRead:
271
            res = self._read_history(params.HistoryReadDetails, rv)
272 1
            results.append(res)
273 1
        return results
274 1
275
    def _read_history(self, details, rv):
276
        """
277
        determine if the history read is for a data changes or events; then read the history for that node
278
        """
279
        result = ua.HistoryReadResult()
280
        if isinstance(details, ua.ReadRawModifiedDetails):
281
            if details.IsReadModified:
282 1
                result.HistoryData = ua.HistoryModifiedData()
283
                # we do not support modified history by design so we return what we have
284
            else:
285
                result.HistoryData = ua.HistoryData()
286
            dv, cont = self._read_datavalue_history(rv, details)
287 1
            result.HistoryData.DataValues = dv
288
            result.ContinuationPoint = cont
289 1
290 1
        elif isinstance(details, ua.ReadEventDetails):
291
            result.HistoryData = ua.HistoryEvent()
292 1
            # FIXME: filter is a cumbersome type, maybe transform it something easier
293
            # to handle for storage
294 1
            ev, cont = self._read_event_history(rv, details)
295
            result.HistoryData.Events = ev
296 1
            result.ContinuationPoint = cont
297 1
298 1
        else:
299
            # we do not currently support the other types, clients can process data themselves
300 1
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
301 1
        return result
302 1
303 1
    def _read_datavalue_history(self, rv, details):
304
        starttime = details.StartTime
305 1
        if rv.ContinuationPoint:
306
            # Spec says we should ignore details if cont point is present
307
            # but they also say we can use cont point as timestamp to enable stateless
308
            # implementation. This is contradictory, so we assume details is
309
            # send correctly with continuation point
310
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
311
312
        dv, cont = self.storage.read_node_history(rv.NodeId,
313
                                                  starttime,
314
                                                  details.EndTime,
315
                                                  details.NumValuesPerNode)
316
        if cont:
317
            cont = ua.pack_datetime(cont)
318
        # rv.IndexRange
319 1
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
320
        return dv, cont
321
322
    def _read_event_history(self, rv, details):
323 1
        starttime = details.StartTime
324
        if rv.ContinuationPoint:
325
            # Spec says we should ignore details if cont point is present
326
            # but they also say we can use cont point as timestamp to enable stateless
327
            # implementation. This is contradictory, so we assume details is
328
            # send correctly with continuation point
329
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
330
331
        evts, cont = self.storage.read_event_history(rv.NodeId,
332
                                                     starttime,
333
                                                     details.EndTime,
334
                                                     details.NumValuesPerNode,
335
                                                     details.Filter)
336
        results = []
337
        for ev in evts:
338
            field_list = ua.HistoryEventFieldList()
339
            field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
340
            results.append(field_list)
341
        if cont:
342
            cont = ua.pack_datetime(cont)
343
        return results, cont
344
345
    def _get_source_event_data(self, source):
346
        # get all event types which the source node can generate; get the fields of those event types
347
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
348
349
        ev_aggregate_fields = []
350
        for event_type in event_types:
351
            ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
352
353
        ev_fields = []
354
        for field in set(ev_aggregate_fields):
355
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
356
        return event_types, ev_fields
357
358
    def update_history(self, params):
359
        """
360
        Update history for a node
361
        This is the part AttributeService, but implemented as its own service
362
        since it requires more logic than other attribute service methods
363
        """
364
        results = []
365
        for _ in params.HistoryUpdateDetails:
366
            result = ua.HistoryUpdateResult()
367
            # we do not accept to rewrite history
368
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
369
            results.append(results)
370
        return results
371
372
    def stop(self):
373
        """
374
        call stop methods of active storage interface whenever the server is stopped
375
        """
376
        self.storage.stop()
377