Completed
Pull Request — master (#180)
by
unknown
03:09
created

HistoryManager._read_event_history()   A

Complexity

Conditions 3

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12
Metric Value
cc 3
dl 0
loc 19
ccs 0
cts 0
cp 0
crap 12
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3
from datetime import datetime
4 1
5 1
from opcua import Subscription
6 1
from opcua import ua
7
from opcua.common import utils
8
from opcua.common import subscription
9 1
10
11
class HistoryStorageInterface(object):
12
13
    """
14
    Interface of a history backend.
15
    Must be implemented by backends
16 1
    """
17
18
    def new_historized_node(self, node_id, period, count=0):
19
        """
20
        Called when a new node is to be historized
21
        Returns None
22
        """
23 1
        raise NotImplementedError
24
25
    def save_node_value(self, node_id, datavalue):
26
        """
27
        Called when the value of a historized node has changed and should be saved in history
28
        Returns None
29
        """
30 1
        raise NotImplementedError
31
32
    def read_node_history(self, node_id, start, end, nb_values):
33
        """
34
        Called when a client make a history read request for a node
35
        if start or end is missing then nb_values is used to limit query
36
        nb_values is the max number of values to read. Ignored if 0
37
        Start time and end time are inclusive
38
        Returns a list of DataValues and a continuation point which
39
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
40
        """
41 1
        raise NotImplementedError
42
43
    def new_historized_event(self, source_id, etype, period):
44
        """
45
        Called when historization of events is enabled on server side
46
        FIXME: we may need to store events per nodes in future...
47
        Returns None
48
        """
49 1
        raise NotImplementedError
50
51
    def save_event(self, event):
52
        """
53
        Called when a new event has been generated ans should be saved in history
54
        Returns None
55
        """
56 1
        raise NotImplementedError
57
58
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
59
        """
60
        Called when a client make a history read request for events
61
        Start time and end time are inclusive
62
        Returns a list of Events and a continuation point which
63
        is None if all events are read or the ServerTimeStamp of the last rejected event
64
        """
65 1
        raise NotImplementedError
66
67
    def stop(self):
68
        """
69
        Called when the server shuts down
70
        Can be used to close database connections etc.
71
        """
72
        raise NotImplementedError
73 1
74
75
class HistoryDict(HistoryStorageInterface):
76
    """
77 1
    very minimal history backend storing data in memory using a Python dictionary
78 1
    """
79 1
    def __init__(self):
80 1
        self._datachanges = {}
81
        self._datachanges_period = {}
82 1
        self._events = {}
83 1
84 1
    def new_historized_node(self, node_id, period, count=0):
85
        self._datachanges[node_id] = []
86 1
        self._datachanges_period[node_id] = period, count
87 1
88 1
    def save_node_value(self, node_id, datavalue):
89 1
        data = self._datachanges[node_id]
90 1
        period, count = self._datachanges_period[node_id]
91 1
        data.append(datavalue)
92 1
        now = datetime.utcnow()
93
        if period:
94 1
            while now - data[0].ServerTimestamp > period:
95 1
                data.pop(0)
96
        if count and len(data) > count:
97 1
            data = data[-count:]
98 1
99 1
    def read_node_history(self, node_id, start, end, nb_values):
100
        cont = None
101
        if node_id not in self._datachanges:
102
            print("Error attempt to read history for a node which is not historized")
103 1
            return [], cont
104
        else:
105 1
            if start is None:
106
                start = ua.DateTimeMinValue
107 1
            if end is None:
108 1
                end = ua.DateTimeMinValue
109 1
            if start == ua.DateTimeMinValue:
110 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
111 1
            elif end == ua.DateTimeMinValue:
112 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
113
            elif start > end:
114
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
115 1
116 1
            else:
117 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
118 1
            if nb_values and len(results) > nb_values:
119 1
                cont = results[nb_values + 1].ServerTimestamp
120
                results = results[:nb_values]
121 1
            return results, cont
122
123
    def new_historized_event(self, source_id, etype, period):
124 1
        self._events = []
125
126
    def save_event(self, event):
127 1
        raise NotImplementedError
128
129
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
130 1
        raise NotImplementedError
131 1
132
    def stop(self):
133
        pass
134 1
135 1
136 1
class SubHandler(object):
137
    def __init__(self, storage):
138 1
        self.storage = storage
139 1
140
    def datachange_notification(self, node, val, data):
141 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
142
143
    def event_notification(self, event):
144
        self.storage.save_event(event)
145 1
146 1
147 1
class HistoryManager(object):
148 1
    def __init__(self, iserver):
149 1
        self.logger = logging.getLogger(__name__)
150 1
        self.iserver = iserver
151
        self.storage = HistoryDict()
152 1
        self._sub = None
153 1
        self._handlers = {}
154
155 1
    def set_storage(self, storage):
156 1
        """
157 1
        set the desired HistoryStorageInterface which History Manager will use for historizing
158 1
        """
159 1
        self.storage = storage
160 1
161 1
    def _create_subscription(self, handler):
162 1
        params = ua.CreateSubscriptionParameters()
163 1
        params.RequestedPublishingInterval = 10
164
        params.RequestedLifetimeCount = 3000
165 1
        params.RequestedMaxKeepAliveCount = 10000
166 1
        params.MaxNotificationsPerPublish = 0
167 1
        params.PublishingEnabled = True
168 1
        params.Priority = 0
169
        return Subscription(self.iserver.isession, params, handler)
170 1
171 1
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
172 1
        """
173
        subscribe to the nodes' data changes and store the data in the active storage
174 1
        """
175 1
        if not self._sub:
176 1
            self._sub = self._create_subscription(SubHandler(self.storage))
177
        if node in self._handlers:
178 1
            raise ua.UaError("Node {} is already historized".format(node))
179
        self.storage.new_historized_node(node.nodeid, period, count)
180
        handler = self._sub.subscribe_data_change(node)
181
        self._handlers[node] = handler
182
183
    def historize_event(self, source, period=timedelta(days=7)):
184 1
        """
185
        subscribe to the source nodes' events and store the data in the active storage; custom event properties included
186 1
        """
187 1
        if not self._sub:
188 1
            self._sub = self._create_subscription(SubHandler(self.storage))
189 1
        if source in self._handlers:
190
            raise ua.UaError("Events from {} are already historized".format(source))
191 1
192
        # get the event types the source node generates and a list of all possible event fields
193
        event_types, ev_fields = self._get_source_event_data(source)
194
195 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period)
196 1
197 1
        handler = self._sub.subscribe_events(source)  # FIXME supply list of event types when master is fixed
198
        self._handlers[source] = handler
199
200
    def dehistorize(self, node):
201 1
        """
202 1
        remove subscription to the node/source which is being historized
203 1
        """
204 1
        if node in self._handlers:
205
            self._sub.unsubscribe(self._handlers[node])
206
            del(self._handlers[node])
207
        else:
208
            self.logger.error("History Manager isn't subscribed to %s", node)
209
210
    def read_history(self, params):
211
        """
212
        Read history for a node
213
        This is the part AttributeService, but implemented as its own service
214
        since it requires more logic than other attribute service methods
215
        """
216 1
        results = []
217
        
218 1
        for rv in params.NodesToRead:
219 1
            res = self._read_history(params.HistoryReadDetails, rv)
220 1
            results.append(res)
221
        return results
222
        
223
    def _read_history(self, details, rv):
224
        """
225
        determine if the history read is for a data changes or events; then read the history for that node
226
        """
227
        result = ua.HistoryReadResult()
228 1
        if isinstance(details, ua.ReadRawModifiedDetails):
229
            if details.IsReadModified:
230
                result.HistoryData = ua.HistoryModifiedData()
231
                # we do not support modified history by design so we return what we have
232 1
            else:
233
                result.HistoryData = ua.HistoryData()
234 1
            dv, cont = self._read_datavalue_history(rv, details)
235
            result.HistoryData.DataValues = dv
236
            result.ContinuationPoint = cont
237
238 1
        elif isinstance(details, ua.ReadEventDetails):
239
            result.HistoryData = ua.HistoryEvent()
240 1
            # FIXME: filter is a cumbersome type, maybe transform it something easier
241
            # to handle for storage
242
            ev, cont = self._read_event_history(rv, details)
243
            result.HistoryData.Events = ev
244
            result.ContinuationPoint = cont
245
246
        else:
247
            # we do not currently support the other types, clients can process data themselves
248
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
249
        return result
250
251
    def _read_datavalue_history(self, rv, details):
252
        starttime = details.StartTime
253
        if rv.ContinuationPoint:
254 1
            # Spec says we should ignore details if cont point is present
255 1
            # but they also say we can use cont point as timestamp to enable stateless
256
            # implementation. This is contradictory, so we assume details is
257
            # send correctly with continuation point
258
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
259
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
260
261
        dv, cont = self.storage.read_node_history(rv.NodeId,
262
                                                  starttime,
263
                                                  details.EndTime,
264
                                                  details.NumValuesPerNode)
265
        if cont:
266
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
267
            cont = ua.pack_datetime(cont)
268
        # rv.IndexRange
269
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
270
        return dv, cont
271
272
    def _read_event_history(self, rv, details):
273
        starttime = details.StartTime
274
        if rv.ContinuationPoint:
275
            # Spec says we should ignore details if cont point is present
276
            # but they also say we can use cont point as timestamp to enable stateless
277
            # implementation. This is contradictory, so we assume details is
278
            # send correctly with continuation point
279
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
280
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
281
282
        ev, cont = self.storage.read_event_history(rv.NodeId,
283
                                                   starttime,
284
                                                   details.EndTime,
285
                                                   details.NumValuesPerNode,
286
                                                   details.Filter)
287
        if cont:
288
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
289
            cont = ua.pack_datetime(cont)
290
        return ev, cont
291
292
    def _get_source_event_data(self, source):
293
        # get all event types which the source node can generate; get the fields of those event types
294
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
295
296
        ev_aggregate_fields = []
297
        for event_type in event_types:
298
            ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
299
300
        ev_fields = []
301
        for field in set(ev_aggregate_fields):
302
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
303
        return event_types, ev_fields
304
305
    def update_history(self, params):
306
        """
307
        Update history for a node
308
        This is the part AttributeService, but implemented as its own service
309
        since it requires more logic than other attribute service methods
310
        """
311
        results = []
312
        for _ in params.HistoryUpdateDetails:
313
            result = ua.HistoryUpdateResult()
314
            # we do not accept to rewrite history
315
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
316
            results.append(results)
317
        return results
318
319
    def stop(self):
320
        """
321
        call stop methods of active storage interface whenever the server is stopped
322
        """
323
        self.storage.stop()
324