Completed
Pull Request — master (#172)
by
unknown
06:10
created

HistoryManager.read_history()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2.0078

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
c 2
b 0
f 0
dl 0
loc 12
ccs 7
cts 8
cp 0.875
crap 2.0078
rs 9.4285
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6 1
from opcua.common import utils
7
8
9 1
class HistoryStorageInterface(object):
10
11
    """
12
    Interface of a history backend.
13
    Must be implemented by backends
14
    """
15
16 1
    def new_historized_node(self, node_id, period, count=0):
17
        """
18
        Called when a new node is to be historized
19
        Returns None
20
        """
21
        raise NotImplementedError
22
23 1
    def save_node_value(self, node_id, datavalue):
24
        """
25
        Called when the value of a historized node has changed and should be saved in history
26
        Returns None
27
        """
28
        raise NotImplementedError
29
30 1
    def read_node_history(self, node_id, start, end, nb_values):
31
        """
32
        Called when a client make a history read request for a node
33
        if start or end is missing then nb_values is used to limit query
34
        nb_values is the max number of values to read. Ignored if 0
35
        Start time and end time are inclusive
36
        Returns a list of DataValues and a continuation point which
37
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
38
        """
39
        raise NotImplementedError
40
41 1
    def new_historized_event(self, source_id, etype, period):
42
        """
43
        Called when historization of events is enabled on server side
44
        FIXME: we may need to store events per nodes in future...
45
        Returns None
46
        """
47
        raise NotImplementedError
48
49 1
    def save_event(self, event):
50
        """
51
        Called when a new event has been generated ans should be saved in history
52
        Returns None
53
        """
54
        raise NotImplementedError
55
56 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
57
        """
58
        Called when a client make a history read request for events
59
        Start time and end time are inclusive
60
        Returns a list of Events and a continuation point which
61
        is None if all events are read or the ServerTimeStamp of the last rejected event
62
        """
63
        raise NotImplementedError
64
65 1
    def stop(self):
66
        """
67
        Called when the server shuts down
68
        Can be used to close database connections etc.
69
        """
70
        raise NotImplementedError
71
72
73 1
class HistoryDict(HistoryStorageInterface):
74
    """
75
    very minimal history backend storing data in memory using a Python dictionary
76
    """
77 1
    def __init__(self):
78 1
        self._datachanges = {}
79 1
        self._datachanges_period = {}
80 1
        self._events = {}
81
82 1
    def new_historized_node(self, node_id, period, count=0):
83 1
        self._datachanges[node_id] = []
84 1
        self._datachanges_period[node_id] = period, count
85
86 1
    def save_node_value(self, node_id, datavalue):
87 1
        data = self._datachanges[node_id]
88 1
        period, count = self._datachanges_period[node_id]
89 1
        data.append(datavalue)
90 1
        now = datetime.utcnow()
91 1
        if period:
92 1
            while now - data[0].ServerTimestamp > period:
93
                data.pop(0)
94 1
        if count and len(data) > count:
95 1
            data = data[-count:]
96
97 1
    def read_node_history(self, node_id, start, end, nb_values):
98 1
        cont = None
99 1
        if node_id not in self._datachanges:
100
            print("Error attempt to read history for a node which is not historized")
101
            return [], cont
102
        else:
103 1
            if start is None:
104
                start = ua.DateTimeMinValue
105 1
            if end is None:
106
                end = ua.DateTimeMinValue
107 1
            if start == ua.DateTimeMinValue:
108 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
109 1
            elif end == ua.DateTimeMinValue:
110 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
111 1
            elif start > end:
112 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
113
114
            else:
115 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
116 1
            if nb_values and len(results) > nb_values:
117 1
                cont = results[nb_values + 1].ServerTimestamp
118 1
                results = results[:nb_values]
119 1
            return results, cont
120
121 1
    def new_historized_event(self, source_id, etype, period):
122
        self._events = []
123
124 1
    def save_event(self, event):
125
        raise NotImplementedError
126
127 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
128
        raise NotImplementedError
129
130 1
    def stop(self):
131 1
        pass
132
133
134 1
class SubHandler(object):
135 1
    def __init__(self, storage):
136 1
        self.storage = storage
137
138 1
    def datachange_notification(self, node, val, data):
139 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
140
141 1
    def event_notification(self, event):
142
        self.storage.save_event(event)
143
144
145 1
class HistoryManager(object):
146 1
    def __init__(self, iserver):
147 1
        self.iserver = iserver
148 1
        self.storage = HistoryDict()
149 1
        self._sub = None
150 1
        self._handlers = {}
151
152 1
    def set_storage(self, storage):
153 1
        self.storage = storage
154
155 1
    def _create_subscription(self, handler):
156 1
        params = ua.CreateSubscriptionParameters()
157 1
        params.RequestedPublishingInterval = 10
158 1
        params.RequestedLifetimeCount = 3000
159 1
        params.RequestedMaxKeepAliveCount = 10000
160 1
        params.MaxNotificationsPerPublish = 0
161 1
        params.PublishingEnabled = True
162 1
        params.Priority = 0
163 1
        return Subscription(self.iserver.isession, params, handler)
164
165 1
    def historize(self, node, period=timedelta(days=7), count=0):
166 1
        if not self._sub:
167 1
            self._sub = self._create_subscription(SubHandler(self.storage))
168 1
        if node in self._handlers:
169
            raise ua.UaError("Node {} is already historized".format(node))
170 1
        self.storage.new_historized_node(node.nodeid, period, count)
171 1
        handler = self._sub.subscribe_data_change(node)
172 1
        self._handlers[node] = handler
173
174 1
    def historize_event(self, source, etype, period=timedelta(days=7)):
175 1
        if not self._sub:
176 1
            self._sub = self._create_subscription(SubHandler(self.storage))
177
        if source in self._handlers:
178 1
            raise ua.UaError("Events from {} are already historized".format(source))
179
        self.storage.new_historized_event(source.nodeid, etype, period)
180
        handler = self._sub.subscribe_events(source)
181
        self._handlers[source] = handler
182
183
    def dehistorize(self, node):
184 1
        self._sub.unsubscribe(self._handlers[node])
185
        del(self._handlers[node])
186 1
187 1
    def read_history(self, params):
188 1
        """
189 1
        Read history for a node
190
        This is the part AttributeService, but implemented as its own service
191 1
        since it requires more logic than other attribute service methods
192
        """
193
        results = []
194
        
195 1
        for rv in params.NodesToRead:
196 1
            res = self._read_history(params.HistoryReadDetails, rv)
197 1
            results.append(res)
198
        return results
199
        
200
    def _read_history(self, details, rv):
201 1
        """
202 1
        read history for a node
203 1
        """
204 1
        result = ua.HistoryReadResult()
205
        if isinstance(details, ua.ReadRawModifiedDetails):
206
            if details.IsReadModified:
207
                result.HistoryData = ua.HistoryModifiedData()
208
                # we do not support modified history by design so we return what we have
209
            else:
210
                result.HistoryData = ua.HistoryData()
211
            dv, cont = self._read_datavalue_history(rv, details)
212
            result.HistoryData.DataValues = dv
213
            result.ContinuationPoint = cont
214
215
        elif isinstance(details, ua.ReadEventDetails):
216 1
            result.HistoryData = ua.HistoryEvent()
217
            # FIXME: filter is a cumbersome type, maybe transform it something easier
218 1
            # to handle for storage
219 1
            ev, cont = self._read_event_history(rv, details)
220 1
            result.HistoryData.Events = ev
221
            result.ContinuationPoint = cont
222
223
        else:
224
            # we do not currently support the other types, clients can process data themselves
225
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
226
        return result
227
228 1
    def _read_datavalue_history(self, rv, details):
229
        starttime = details.StartTime
230
        if rv.ContinuationPoint:
231
            # Spec says we should ignore details if cont point is present
232 1
            # but they also say we can use cont point as timestamp to enable stateless
233
            # implementation. This is contradictory, so we assume details is
234 1
            # send correctly with continuation point
235
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
236
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
237
238 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
239
                                                  starttime,
240 1
                                                  details.EndTime,
241
                                                  details.NumValuesPerNode)
242
        if cont:
243
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
244
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)  # FIXME pretty sure this isn't correct; should just pack cont itself, not dv[-1]
245
        # FIXME, parse index range and filter out if necessary
246
        # rv.IndexRange
247
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
248
        return dv, cont
249
250
    def _read_event_history(self, rv, details):
251
        starttime = details.StartTime
252
        if rv.ContinuationPoint:
253
            # Spec says we should ignore details if cont point is present
254 1
            # but they also say we can use cont point as timestamp to enable stateless
255 1
            # implementation. This is contradictory, so we assume details is
256
            # send correctly with continuation point
257
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
258
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
259
260
        ev, cont = self.storage.read_event_history(rv.NodeId,
261
                                                   starttime,
262
                                                   details.EndTime,
263
                                                   details.NumValuesPerNode,
264
                                                   details.Filter)
265
        if cont:
266
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
267
            cont = ua.pack_datetime(ev[-1].Time)  # FIXME pretty sure this isn't correct; should just pack cont itself, not ev[-1]
268
        return ev, cont
269
270
    def update_history(self, params):
271
        """
272
        Update history for a node
273
        This is the part AttributeService, but implemented as its own service
274
        since it requires more logic than other attribute service methods
275
        """
276
        results = []
277
        for _ in params.HistoryUpdateDetails:
278
            result = ua.HistoryUpdateResult()
279
            # we do not accept to rewrite history
280
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
281
            results.append(results)
282
        return results
283
284
    def stop(self):
285
        self.storage.stop()
286