Completed
Pull Request — master (#158)
by Olivier
02:49
created

HistoryDict.save_node_value()   B

Complexity

Conditions 5

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 5.025
Metric Value
cc 5
dl 0
loc 10
rs 8.5454
ccs 9
cts 10
cp 0.9
crap 5.025
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6 1
from opcua.common import utils
7
8
9 1
class HistoryStorageInterface(object):
10
11
    """
12
    Interface of a history backend.
13
    Must be implemented by backends
14
    """
15
16 1
    def new_historized_node(self, node_id, period, count=0):
17
        """
18
        Called when a new node is to be historized
19
        Returns None
20
        """
21
        raise NotImplementedError
22
23 1
    def save_node_value(self, node_id, datavalue):
24
        """
25
        Called when the value of a historized node has changed and should be saved in history
26
        Returns None
27
        """
28
        raise NotImplementedError
29
30 1
    def read_node_history(self, node_id, start, end, nb_values):
31
        """
32
        Called when a client make a history read request for a node
33
        if start or end is missing then nb_values is used to limit query
34
        nb_values is the max number of values to read. Ignored if 0
35
        Start time and end time are inclusive
36
        Returns a list of DataValues and a continuation point which
37
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
38
        """
39
        raise NotImplementedError
40
41 1
    def new_historized_event(self, event, period):
42
        """
43
        Called when historization of events is enabled on server side
44
        FIXME: we may need to store events per nodes in future...
45
        Returns None
46
        """
47
        raise NotImplementedError
48
49 1
    def save_event(self, event):
50
        """
51
        Called when a new event has been generated ans should be saved in history
52
        Returns None
53
        """
54
        raise NotImplementedError
55
56 1
    def read_event_history(self, start, end, evfilter):
57
        """
58
        Called when a client make a history read request for events
59
        Start time and end time are inclusive
60
        Returns a list of Events and a continuation point which
61
        is None if all events are read or the ServerTimeStamp of the last rejected event
62
        """
63
        raise NotImplementedError
64
65 1
    def stop(self):
66
        """
67
        Called when the server shuts down
68
        Can be used to close database connections etc.
69
        """
70
        raise NotImplementedError
71
72
73 1
class HistoryDict(HistoryStorageInterface):
74
    """
75
    very minimal history backend storing data in memory using a Python dictionary
76
    """
77 1
    def __init__(self):
78 1
        self._datachanges = {}
79 1
        self._datachanges_period = {}
80 1
        self._events = {}
81
82 1
    def new_historized_node(self, node_id, period, count=0):
83 1
        self._datachanges[node_id] = []
84 1
        self._datachanges_period[node_id] = period, count
85
86 1
    def save_node_value(self, node_id, datavalue):
87 1
        data = self._datachanges[node_id]
88 1
        period, count = self._datachanges_period[node_id]
89 1
        data.append(datavalue)
90 1
        now = datetime.utcnow()
91 1
        if period:
92 1
            while now - data[0].ServerTimestamp > period:
93
                data.pop(0)
94 1
        if count and len(data) > count:
95 1
            data = data[-count:]
96
97 1
    def read_node_history(self, node_id, start, end, nb_values):
98 1
        cont = None
99 1
        if node_id not in self._datachanges:
100
            print("Error attempt to read history for a node which is not historized")
101
            return [], cont
102
        else:
103 1
            if start is None:
104
                start = ua.DateTimeMinValue
105 1
            if end is None:
106
                end = ua.DateTimeMinValue
107 1
            if start == ua.DateTimeMinValue:
108 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
109 1
            elif end == ua.DateTimeMinValue:
110 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
111 1
            elif start > end:
112 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
113
114
            else:
115 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
116 1
            if nb_values and len(results) > nb_values:
117 1
                cont = results[nb_values + 1].ServerTimestamp
118 1
                results = results[:nb_values]
119 1
            return results, cont
120
121 1
    def new_historized_event(self, event, period):
122
        self._events = []
123
124 1
    def save_event(self, event):
125
        raise NotImplementedError
126
127 1
    def read_event_history(self, start, end, evfilter):
128
        raise NotImplementedError
129
130 1
    def stop(self):
131 1
        pass
132
133
134 1
class SubHandler(object):
135 1
    def __init__(self, storage):
136 1
        self.storage = storage
137
138 1
    def datachange_notification(self, node, val, data):
139 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
140
141 1
    def event_notification(self, event):
142
        self.storage.save_event(event)
143
144
145 1
class HistoryManager(object):
146 1
    def __init__(self, iserver):
147 1
        self.iserver = iserver
148 1
        self.storage = HistoryDict()
149 1
        self._sub = None
150 1
        self._handlers = {}
151
152 1
    def set_storage(self, storage):
153 1
        self.storage = storage
154
155 1
    def _create_subscription(self, handler):
156 1
        params = ua.CreateSubscriptionParameters()
157 1
        params.RequestedPublishingInterval = 10
158 1
        params.RequestedLifetimeCount = 3000
159 1
        params.RequestedMaxKeepAliveCount = 10000
160 1
        params.MaxNotificationsPerPublish = 0
161 1
        params.PublishingEnabled = True
162 1
        params.Priority = 0
163 1
        return Subscription(self.iserver.isession, params, handler)
164
165 1
    def historize(self, node, period=timedelta(days=7), count=0):
166 1
        if not self._sub:
167 1
            self._sub = self._create_subscription(SubHandler(self.storage))
168 1
        if node in self._handlers:
169
            raise ua.UaError("Node {} is already historized".format(node))
170 1
        self.storage.new_historized_node(node.nodeid, period, count)
171 1
        handler = self._sub.subscribe_data_change(node)
172 1
        self._handlers[node] = handler
173
174 1
    def dehistorize(self, node):
175 1
        self._sub.unsubscribe(self._handlers[node])
176 1
        del(self._handlers[node])
177
178 1
    def read_history(self, params):
179
        """
180
        Read history for a node
181
        This is the part AttributeService, but implemented as its own service
182
        since it requires more logic than other attribute service methods
183
        """
184 1
        results = []
185
        
186 1
        for rv in params.NodesToRead:
187 1
            res = self._read_history(params.HistoryReadDetails, rv)
188 1
            results.append(res)
189 1
        return results
190
        
191 1
    def _read_history(self, details, rv):
192
        """
193
        read history for a node
194
        """
195 1
        result = ua.HistoryReadResult()
196 1
        if isinstance(details, ua.ReadRawModifiedDetails):
197 1
            if details.IsReadModified:
198
                result.HistoryData = ua.HistoryModifiedData()
199
                # we do not support modified history by design so we return what we have
200
            else:
201 1
                result.HistoryData = ua.HistoryData()
202 1
            dv, cont = self._read_datavalue_history(rv, details)
203 1
            result.HistoryData.DataValues = dv
204 1
            result.ContinuationPoint = cont
205
206
        elif isinstance(details, ua.ReadEventDetails):
207
            result.HistoryData = ua.HistoryEvent()
208
            # FIXME: filter is a cumbersome type, maybe transform it something easier
209
            # to handle for storage
210
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
211
                                                                        details.EndTime,
212
                                                                        details.Filter)
213
        else:
214
            # we do not currently support the other types, clients can process data themselves
215
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
216 1
        return result
217
218 1
    def _read_datavalue_history(self, rv, details):
219 1
        starttime = details.StartTime
220 1
        if rv.ContinuationPoint:
221
            # Spec says we should ignore details if cont point is present
222
            # but they also say we can use cont point as timestamp to enable stateless
223
            # implementation. This is contradictory, so we assume details is
224
            # send correctly with continuation point
225
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
226
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
227
228 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
229
                                                  starttime,
230
                                                  details.EndTime,
231
                                                  details.NumValuesPerNode)
232 1
        if cont:
233
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
234 1
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
235
        # FIXME, parse index range and filter out if necessary
236
        # rv.IndexRange
237
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
238 1
        return dv, cont
239
240 1
    def update_history(self, params):
241
        """
242
        Update history for a node
243
        This is the part AttributeService, but implemented as its own service
244
        since it requires more logic than other attribute service methods
245
        """
246
        results = []
247
        for _ in params.HistoryUpdateDetails:
248
            result = ua.HistoryUpdateResult()
249
            # we do not accept to rewrite history
250
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
251
            results.append(results)
252
        return results
253
254 1
    def stop(self):
255
        self.storage.stop()
256