Completed
Pull Request — master (#172)
by
unknown
06:26
created

HistoryManager._read_event_history()   A

Complexity

Conditions 3

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12
Metric Value
cc 3
dl 0
loc 19
ccs 0
cts 0
cp 0
crap 12
rs 9.4285
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6 1
from opcua.common import utils
7
from opcua.common import event
8
9 1
import itertools
10
11
12
class HistoryStorageInterface(object):
13
14
    """
15
    Interface of a history backend.
16 1
    Must be implemented by backends
17
    """
18
19
    def new_historized_node(self, node_id, period, count=0):
20
        """
21
        Called when a new node is to be historized
22
        Returns None
23 1
        """
24
        raise NotImplementedError
25
26
    def save_node_value(self, node_id, datavalue):
27
        """
28
        Called when the value of a historized node has changed and should be saved in history
29
        Returns None
30 1
        """
31
        raise NotImplementedError
32
33
    def read_node_history(self, node_id, start, end, nb_values):
34
        """
35
        Called when a client make a history read request for a node
36
        if start or end is missing then nb_values is used to limit query
37
        nb_values is the max number of values to read. Ignored if 0
38
        Start time and end time are inclusive
39
        Returns a list of DataValues and a continuation point which
40
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
41 1
        """
42
        raise NotImplementedError
43
44
    def new_historized_event(self, source_id, etype, period):
45
        """
46
        Called when historization of events is enabled on server side
47
        FIXME: we may need to store events per nodes in future...
48
        Returns None
49 1
        """
50
        raise NotImplementedError
51
52
    def save_event(self, event):
53
        """
54
        Called when a new event has been generated ans should be saved in history
55
        Returns None
56 1
        """
57
        raise NotImplementedError
58
59
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
60
        """
61
        Called when a client make a history read request for events
62
        Start time and end time are inclusive
63
        Returns a list of Events and a continuation point which
64
        is None if all events are read or the ServerTimeStamp of the last rejected event
65 1
        """
66
        raise NotImplementedError
67
68
    def stop(self):
69
        """
70
        Called when the server shuts down
71
        Can be used to close database connections etc.
72
        """
73 1
        raise NotImplementedError
74
75
76
class HistoryDict(HistoryStorageInterface):
77 1
    """
78 1
    very minimal history backend storing data in memory using a Python dictionary
79 1
    """
80 1
    def __init__(self):
81
        self._datachanges = {}
82 1
        self._datachanges_period = {}
83 1
        self._events = {}
84 1
85
    def new_historized_node(self, node_id, period, count=0):
86 1
        self._datachanges[node_id] = []
87 1
        self._datachanges_period[node_id] = period, count
88 1
89 1
    def save_node_value(self, node_id, datavalue):
90 1
        data = self._datachanges[node_id]
91 1
        period, count = self._datachanges_period[node_id]
92 1
        data.append(datavalue)
93
        now = datetime.utcnow()
94 1
        if period:
95 1
            while now - data[0].ServerTimestamp > period:
96
                data.pop(0)
97 1
        if count and len(data) > count:
98 1
            data = data[-count:]
99 1
100
    def read_node_history(self, node_id, start, end, nb_values):
101
        cont = None
102
        if node_id not in self._datachanges:
103 1
            print("Error attempt to read history for a node which is not historized")
104
            return [], cont
105 1
        else:
106
            if start is None:
107 1
                start = ua.DateTimeMinValue
108 1
            if end is None:
109 1
                end = ua.DateTimeMinValue
110 1
            if start == ua.DateTimeMinValue:
111 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
112 1
            elif end == ua.DateTimeMinValue:
113
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
114
            elif start > end:
115 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
116 1
117 1
            else:
118 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
119 1
            if nb_values and len(results) > nb_values:
120
                cont = results[nb_values + 1].ServerTimestamp
121 1
                results = results[:nb_values]
122
            return results, cont
123
124 1
    def new_historized_event(self, source_id, etype, period):
125
        self._events = []
126
127 1
    def save_event(self, event):
128
        raise NotImplementedError
129
130 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
131 1
        raise NotImplementedError
132
133
    def stop(self):
134 1
        pass
135 1
136 1
137
class SubHandler(object):
138 1
    def __init__(self, storage):
139 1
        self.storage = storage
140
141 1
    def datachange_notification(self, node, val, data):
142
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
143
144
    def event_notification(self, event):
145 1
        self.storage.save_event(event)
146 1
147 1
148 1
class HistoryManager(object):
149 1
    def __init__(self, iserver):
150 1
        self.iserver = iserver
151
        self.storage = HistoryDict()
152 1
        self._sub = None
153 1
        self._handlers = {}
154
155 1
    def set_storage(self, storage):
156 1
        self.storage = storage
157 1
158 1
    def _create_subscription(self, handler):
159 1
        params = ua.CreateSubscriptionParameters()
160 1
        params.RequestedPublishingInterval = 10
161 1
        params.RequestedLifetimeCount = 3000
162 1
        params.RequestedMaxKeepAliveCount = 10000
163 1
        params.MaxNotificationsPerPublish = 0
164
        params.PublishingEnabled = True
165 1
        params.Priority = 0
166 1
        return Subscription(self.iserver.isession, params, handler)
167 1
168 1
    def historize(self, node, period=timedelta(days=7), count=0):
169
        if not self._sub:
170 1
            self._sub = self._create_subscription(SubHandler(self.storage))
171 1
        if node in self._handlers:
172 1
            raise ua.UaError("Node {} is already historized".format(node))
173
        self.storage.new_historized_node(node.nodeid, period, count)
174 1
        handler = self._sub.subscribe_data_change(node)
175 1
        self._handlers[node] = handler
176 1
177
    def historize_event(self, source, period=timedelta(days=7)):
178 1
        if not self._sub:
179
            self._sub = self._create_subscription(SubHandler(self.storage))
180
        if source in self._handlers:  # FIXME a single source might have many event has a handlers, how to check?
181
            raise ua.UaError("Events from {} are already historized".format(source))
182
183
        # get the event types the source node generates and a list of all possible event fields
184 1
        event_types, ev_fields = self._get_source_event_data(source)
185
186 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period)
187 1
188 1
        ev_handlers = []
189 1
        for event_type in event_types:
190
            handler = self._sub.subscribe_events(source, event_type)
191 1
            ev_handlers.append(handler)
192
        self._handlers[source] = ev_handlers  # FIXME no way to dehistorize because of this at the moment
193
194
    def dehistorize(self, node):
195 1
        self._sub.unsubscribe(self._handlers[node])
196 1
        del(self._handlers[node])
197 1
198
    def read_history(self, params):
199
        """
200
        Read history for a node
201 1
        This is the part AttributeService, but implemented as its own service
202 1
        since it requires more logic than other attribute service methods
203 1
        """
204 1
        results = []
205
        
206
        for rv in params.NodesToRead:
207
            res = self._read_history(params.HistoryReadDetails, rv)
208
            results.append(res)
209
        return results
210
        
211
    def _read_history(self, details, rv):
212
        """
213
        read history for a node
214
        """
215
        result = ua.HistoryReadResult()
216 1
        if isinstance(details, ua.ReadRawModifiedDetails):
217
            if details.IsReadModified:
218 1
                result.HistoryData = ua.HistoryModifiedData()
219 1
                # we do not support modified history by design so we return what we have
220 1
            else:
221
                result.HistoryData = ua.HistoryData()
222
            dv, cont = self._read_datavalue_history(rv, details)
223
            result.HistoryData.DataValues = dv
224
            result.ContinuationPoint = cont
225
226
        elif isinstance(details, ua.ReadEventDetails):
227
            result.HistoryData = ua.HistoryEvent()
228 1
            # FIXME: filter is a cumbersome type, maybe transform it something easier
229
            # to handle for storage
230
            ev, cont = self._read_event_history(rv, details)
231
            result.HistoryData.Events = ev
232 1
            result.ContinuationPoint = cont
233
234 1
        else:
235
            # we do not currently support the other types, clients can process data themselves
236
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
237
        return result
238 1
239
    def _read_datavalue_history(self, rv, details):
240 1
        starttime = details.StartTime
241
        if rv.ContinuationPoint:
242
            # Spec says we should ignore details if cont point is present
243
            # but they also say we can use cont point as timestamp to enable stateless
244
            # implementation. This is contradictory, so we assume details is
245
            # send correctly with continuation point
246
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
247
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
248
249
        dv, cont = self.storage.read_node_history(rv.NodeId,
250
                                                  starttime,
251
                                                  details.EndTime,
252
                                                  details.NumValuesPerNode)
253
        if cont:
254 1
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
255 1
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)  # FIXME pretty sure this isn't correct; should just pack cont itself, not dv[-1]
256
        # FIXME, parse index range and filter out if necessary
257
        # rv.IndexRange
258
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
259
        return dv, cont
260
261
    def _read_event_history(self, rv, details):
262
        starttime = details.StartTime
263
        if rv.ContinuationPoint:
264
            # Spec says we should ignore details if cont point is present
265
            # but they also say we can use cont point as timestamp to enable stateless
266
            # implementation. This is contradictory, so we assume details is
267
            # send correctly with continuation point
268
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
269
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
270
271
        ev, cont = self.storage.read_event_history(rv.NodeId,
272
                                                   starttime,
273
                                                   details.EndTime,
274
                                                   details.NumValuesPerNode,
275
                                                   details.Filter)
276
        if cont:
277
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
278
            cont = ua.pack_datetime(ev[-1].Time)  # FIXME pretty sure this isn't correct; should just pack cont itself, not ev[-1]
279
        return ev, cont
280
281
    def _get_source_event_data(self, source):
282
        # get all event types which the source node can generate; get the fields of those event types
283
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
284
285
        ev_aggregate_fields = []
286
        for event_type in event_types:
287
            ev_aggregate_fields.extend((event.get_event_properties_from_type_node(event_type)))
288
289
        ev_fields = []
290
        for field in set(ev_aggregate_fields):
291
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
292
        return event_types, ev_fields
293
294
    def update_history(self, params):
295
        """
296
        Update history for a node
297
        This is the part AttributeService, but implemented as its own service
298
        since it requires more logic than other attribute service methods
299
        """
300
        results = []
301
        for _ in params.HistoryUpdateDetails:
302
            result = ua.HistoryUpdateResult()
303
            # we do not accept to rewrite history
304
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
305
            results.append(results)
306
        return results
307
308
    def stop(self):
309
        self.storage.stop()
310