Completed
Pull Request — master (#180)
by
unknown
04:14
created

HistoryManager._read_history()   B

Complexity

Conditions 4

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 5.5021

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 4
c 3
b 0
f 0
dl 0
loc 27
ccs 6
cts 11
cp 0.5455
crap 5.5021
rs 8.5806
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6 1
from opcua.common import utils
7
from opcua.common import subscription
8
9 1
10
class HistoryStorageInterface(object):
11
12
    """
13
    Interface of a history backend.
14
    Must be implemented by backends
15
    """
16 1
17
    def new_historized_node(self, node_id, period, count=0):
18
        """
19
        Called when a new node is to be historized
20
        Returns None
21
        """
22
        raise NotImplementedError
23 1
24
    def save_node_value(self, node_id, datavalue):
25
        """
26
        Called when the value of a historized node has changed and should be saved in history
27
        Returns None
28
        """
29
        raise NotImplementedError
30 1
31
    def read_node_history(self, node_id, start, end, nb_values):
32
        """
33
        Called when a client make a history read request for a node
34
        if start or end is missing then nb_values is used to limit query
35
        nb_values is the max number of values to read. Ignored if 0
36
        Start time and end time are inclusive
37
        Returns a list of DataValues and a continuation point which
38
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
39
        """
40
        raise NotImplementedError
41 1
42
    def new_historized_event(self, source_id, etype, period):
43
        """
44
        Called when historization of events is enabled on server side
45
        FIXME: we may need to store events per nodes in future...
46
        Returns None
47
        """
48
        raise NotImplementedError
49 1
50
    def save_event(self, event):
51
        """
52
        Called when a new event has been generated ans should be saved in history
53
        Returns None
54
        """
55
        raise NotImplementedError
56 1
57
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
58
        """
59
        Called when a client make a history read request for events
60
        Start time and end time are inclusive
61
        Returns a list of Events and a continuation point which
62
        is None if all events are read or the ServerTimeStamp of the last rejected event
63
        """
64
        raise NotImplementedError
65 1
66
    def stop(self):
67
        """
68
        Called when the server shuts down
69
        Can be used to close database connections etc.
70
        """
71
        raise NotImplementedError
72
73 1
74
class HistoryDict(HistoryStorageInterface):
75
    """
76
    very minimal history backend storing data in memory using a Python dictionary
77 1
    """
78 1
    def __init__(self):
79 1
        self._datachanges = {}
80 1
        self._datachanges_period = {}
81
        self._events = {}
82 1
83 1
    def new_historized_node(self, node_id, period, count=0):
84 1
        self._datachanges[node_id] = []
85
        self._datachanges_period[node_id] = period, count
86 1
87 1
    def save_node_value(self, node_id, datavalue):
88 1
        data = self._datachanges[node_id]
89 1
        period, count = self._datachanges_period[node_id]
90 1
        data.append(datavalue)
91 1
        now = datetime.utcnow()
92 1
        if period:
93
            while now - data[0].ServerTimestamp > period:
94 1
                data.pop(0)
95 1
        if count and len(data) > count:
96
            data = data[-count:]
97 1
98 1
    def read_node_history(self, node_id, start, end, nb_values):
99 1
        cont = None
100
        if node_id not in self._datachanges:
101
            print("Error attempt to read history for a node which is not historized")
102
            return [], cont
103 1
        else:
104
            if start is None:
105 1
                start = ua.DateTimeMinValue
106
            if end is None:
107 1
                end = ua.DateTimeMinValue
108 1
            if start == ua.DateTimeMinValue:
109 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
110 1
            elif end == ua.DateTimeMinValue:
111 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
112 1
            elif start > end:
113
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
114
115 1
            else:
116 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
117 1
            if nb_values and len(results) > nb_values:
118 1
                cont = results[nb_values + 1].ServerTimestamp
119 1
                results = results[:nb_values]
120
            return results, cont
121 1
122
    def new_historized_event(self, source_id, etype, period):
123
        self._events = []
124 1
125
    def save_event(self, event):
126
        raise NotImplementedError
127 1
128
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
129
        raise NotImplementedError
130 1
131 1
    def stop(self):
132
        pass
133
134 1
135 1
class SubHandler(object):
136 1
    def __init__(self, storage):
137
        self.storage = storage
138 1
139 1
    def datachange_notification(self, node, val, data):
140
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
141 1
142
    def event_notification(self, event):
143
        self.storage.save_event(event)
144
145 1
146 1
class HistoryManager(object):
147 1
    def __init__(self, iserver):
148 1
        self.iserver = iserver
149 1
        self.storage = HistoryDict()
150 1
        self._sub = None
151
        self._handlers = {}
152 1
153 1
    def set_storage(self, storage):
154
        self.storage = storage
155 1
156 1
    def _create_subscription(self, handler):
157 1
        params = ua.CreateSubscriptionParameters()
158 1
        params.RequestedPublishingInterval = 10
159 1
        params.RequestedLifetimeCount = 3000
160 1
        params.RequestedMaxKeepAliveCount = 10000
161 1
        params.MaxNotificationsPerPublish = 0
162 1
        params.PublishingEnabled = True
163 1
        params.Priority = 0
164
        return Subscription(self.iserver.isession, params, handler)
165 1
166 1
    def historize_var(self, node, period=timedelta(days=7), count=0):
167 1
        if not self._sub:
168 1
            self._sub = self._create_subscription(SubHandler(self.storage))
169
        if node in self._handlers:
170 1
            raise ua.UaError("Node {} is already historized".format(node))
171 1
        self.storage.new_historized_node(node.nodeid, period, count)
172 1
        handler = self._sub.subscribe_data_change(node)
173
        self._handlers[node] = handler
174 1
175 1
    def historize_event(self, source, period=timedelta(days=7)):
176 1
        if not self._sub:
177
            self._sub = self._create_subscription(SubHandler(self.storage))
178 1
        if source in self._handlers:
179
            raise ua.UaError("Events from {} are already historized".format(source))
180
181
        # get the event types the source node generates and a list of all possible event fields
182
        event_types, ev_fields = self._get_source_event_data(source)
183
184 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period)
185
186 1
        handler = self._sub.subscribe_events(source)  # FIXME supply list of event types when master is fixed
187 1
        self._handlers[source] = handler
188 1
189 1
    def dehistorize(self, node):
190
        self._sub.unsubscribe(self._handlers[node])
191 1
        del(self._handlers[node])
192
193
    def read_history(self, params):
194
        """
195 1
        Read history for a node
196 1
        This is the part AttributeService, but implemented as its own service
197 1
        since it requires more logic than other attribute service methods
198
        """
199
        results = []
200
        
201 1
        for rv in params.NodesToRead:
202 1
            res = self._read_history(params.HistoryReadDetails, rv)
203 1
            results.append(res)
204 1
        return results
205
        
206
    def _read_history(self, details, rv):
207
        """
208
        read history for a node
209
        """
210
        result = ua.HistoryReadResult()
211
        if isinstance(details, ua.ReadRawModifiedDetails):
212
            if details.IsReadModified:
213
                result.HistoryData = ua.HistoryModifiedData()
214
                # we do not support modified history by design so we return what we have
215
            else:
216 1
                result.HistoryData = ua.HistoryData()
217
            dv, cont = self._read_datavalue_history(rv, details)
218 1
            result.HistoryData.DataValues = dv
219 1
            result.ContinuationPoint = cont
220 1
221
        elif isinstance(details, ua.ReadEventDetails):
222
            result.HistoryData = ua.HistoryEvent()
223
            # FIXME: filter is a cumbersome type, maybe transform it something easier
224
            # to handle for storage
225
            ev, cont = self._read_event_history(rv, details)
226
            result.HistoryData.Events = ev
227
            result.ContinuationPoint = cont
228 1
229
        else:
230
            # we do not currently support the other types, clients can process data themselves
231
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
232 1
        return result
233
234 1
    def _read_datavalue_history(self, rv, details):
235
        starttime = details.StartTime
236
        if rv.ContinuationPoint:
237
            # Spec says we should ignore details if cont point is present
238 1
            # but they also say we can use cont point as timestamp to enable stateless
239
            # implementation. This is contradictory, so we assume details is
240 1
            # send correctly with continuation point
241
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
242
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
243
244
        dv, cont = self.storage.read_node_history(rv.NodeId,
245
                                                  starttime,
246
                                                  details.EndTime,
247
                                                  details.NumValuesPerNode)
248
        if cont:
249
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
250
            cont = ua.pack_datetime(cont)
251
        # rv.IndexRange
252
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
253
        return dv, cont
254 1
255 1
    def _read_event_history(self, rv, details):
256
        starttime = details.StartTime
257
        if rv.ContinuationPoint:
258
            # Spec says we should ignore details if cont point is present
259
            # but they also say we can use cont point as timestamp to enable stateless
260
            # implementation. This is contradictory, so we assume details is
261
            # send correctly with continuation point
262
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
263
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
264
265
        ev, cont = self.storage.read_event_history(rv.NodeId,
266
                                                   starttime,
267
                                                   details.EndTime,
268
                                                   details.NumValuesPerNode,
269
                                                   details.Filter)
270
        if cont:
271
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
272
            cont = ua.pack_datetime(cont)
273
        return ev, cont
274
275
    def _get_source_event_data(self, source):
276
        # get all event types which the source node can generate; get the fields of those event types
277
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
278
279
        ev_aggregate_fields = []
280
        for event_type in event_types:
281
            ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
282
283
        ev_fields = []
284
        for field in set(ev_aggregate_fields):
285
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
286
        return event_types, ev_fields
287
288
    def update_history(self, params):
289
        """
290
        Update history for a node
291
        This is the part AttributeService, but implemented as its own service
292
        since it requires more logic than other attribute service methods
293
        """
294
        results = []
295
        for _ in params.HistoryUpdateDetails:
296
            result = ua.HistoryUpdateResult()
297
            # we do not accept to rewrite history
298
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
299
            results.append(results)
300
        return results
301
302
    def stop(self):
303
        self.storage.stop()
304