Completed
Pull Request — master (#156)
by
unknown
02:35
created

_read_datavalue_history()   A

Complexity

Conditions 3

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175
Metric Value
cc 3
dl 0
loc 21
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 9.3142
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6
7
8 1
class HistoryStorageInterface(object):
9
10
    """
11
    Interface of a history backend.
12
    Must be implemented by backends
13
    """
14
15 1
    def new_historized_node(self, node_id, period, count=0):
16
        """
17
        Called when a new node is to be historized
18
        Returns None
19
        """
20
        raise NotImplementedError
21
22 1
    def save_node_value(self, node_id, datavalue):
23
        """
24
        Called when the value of a historized node has changed and should be saved in history
25
        Returns None
26
        """
27
        raise NotImplementedError
28
29 1
    def read_node_history(self, node_id, start, end, nb_values):
30
        """
31
        Called when a client make a history read request for a node
32
        if start or end is missing then nb_values is used to limit query
33
        nb_values is the max number of values to read. Ignored if 0
34
        Start time and end time are inclusive
35
        Returns a list of DataValues and a continuation point which
36
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
37
        """
38
        raise NotImplementedError
39
40 1
    def new_historized_event(self, event, period):
41
        """
42
        Called when historization of events is enabled on server side
43
        FIXME: we may need to store events per nodes in future...
44
        Returns None
45
        """
46
        raise NotImplementedError
47
48 1
    def save_event(self, event):
49
        """
50
        Called when a new event has been generated ans should be saved in history
51
        Returns None
52
        """
53
        raise NotImplementedError
54
55 1
    def read_event_history(self, start, end, evfilter):
56
        """
57
        Called when a client make a history read request for events
58
        Start time and end time are inclusive
59
        Returns a list of Events and a continuation point which
60
        is None if all events are read or the ServerTimeStamp of the last rejected event
61
        """
62
        raise NotImplementedError
63
64 1
    def stop(self):
65
        """
66
        Called when the server shuts down
67
        Can be used to close database connections etc.
68
        """
69
        raise NotImplementedError
70
71
72
# if you want to use an SQL based history uncomment this import and change the storage type of the history manager
73
# from opcua.server.history_sql import HistorySQLite
74
75
76 1
class HistoryDict(HistoryStorageInterface):
77
    """
78
    very minimal history backend storing data in memory using a Python dictionary
79
    """
80 1
    def __init__(self):
81 1
        self._datachanges = {}
82 1
        self._datachanges_period = {}
83 1
        self._events = {}
84
85 1
    def new_historized_node(self, node_id, period, count=0):
86 1
        self._datachanges[node_id] = []
87 1
        self._datachanges_period[node_id] = period, count
88
89 1
    def save_node_value(self, node_id, datavalue):
90 1
        data = self._datachanges[node_id]
91 1
        period, count = self._datachanges_period[node_id]
92 1
        data.append(datavalue)
93 1
        now = datetime.now()
94 1
        if period:
95 1
            while now - data[0].ServerTimestamp > period:
96
                data.pop(0)
97 1
        if count and len(data) > count:
98 1
            data = data[-count:]
99
100 1
    def read_node_history(self, node_id, start, end, nb_values):
101 1
        cont = None
102 1
        if node_id not in self._datachanges:
103
            print("Error attempt to read history for a node which is not historized")
104
            return [], cont
105
        else:
106 1
            if end is None:
107
                end = datetime.now() + timedelta(days=1)
108 1
            if start is None:
109
                start = ua.DateTimeMinValue
110 1
            results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
111 1
            if nb_values:
112 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
113 1
                    cont = results[nb_values + 1].ServerTimestamp
114 1
                    results = results[:nb_values]
115
                else:
116 1
                    results = results[-nb_values:]
117 1
            return results, cont
118
119 1
    def new_historized_event(self, event, period):
120
        self._events = []
121
122 1
    def save_event(self, event):
123
        raise NotImplementedError
124
125 1
    def read_event_history(self, start, end, evfilter):
126
        raise NotImplementedError
127
128 1
    def stop(self):
129 1
        pass
130
131
132 1
class SubHandler(object):
133 1
    def __init__(self, storage):
134 1
        self.storage = storage
135
136 1
    def datachange_notification(self, node, val, data):
137 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
138
139 1
    def event_notification(self, event):
140
        self.storage.save_event(event)
141
142
143 1
class HistoryManager(object):
144 1
    def __init__(self, iserver):
145 1
        self.iserver = iserver
146 1
        self.storage = HistoryDict()  # Change to HistorySQLite() for file based history
147 1
        self._sub = None
148 1
        self._handlers = {}
149
150 1
    def set_storage(self, storage):
151
        self.storage = storage
152
153 1
    def _create_subscription(self, handler):
154 1
        params = ua.CreateSubscriptionParameters()
155 1
        params.RequestedPublishingInterval = 10
156 1
        params.RequestedLifetimeCount = 3000
157 1
        params.RequestedMaxKeepAliveCount = 10000
158 1
        params.MaxNotificationsPerPublish = 0
159 1
        params.PublishingEnabled = True
160 1
        params.Priority = 0
161 1
        return Subscription(self.iserver.isession, params, handler)
162
163 1
    def historize(self, node, period=timedelta(days=7), count=0):
164 1
        if not self._sub:
165 1
            self._sub = self._create_subscription(SubHandler(self.storage))
166 1
        if node in self._handlers:
167
            raise ua.UaError("Node {} is already historized".format(node))
168 1
        self.storage.new_historized_node(node.nodeid, period, count)
169 1
        handler = self._sub.subscribe_data_change(node)
170 1
        self._handlers[node] = handler
171
172 1
    def dehistorize(self, node):
173 1
        self._sub.unsubscribe(self._handlers[node])
174 1
        del(self._handlers[node])
175
176 1
    def read_history(self, params):
177
        """
178
        Read history for a node
179
        This is the part AttributeService, but implemented as its own service
180
        since it requires more logic than other attribute service methods
181
        """
182 1
        results = []
183
        
184 1
        for rv in params.NodesToRead:
185 1
            res = self._read_history(params.HistoryReadDetails, rv)
186 1
            results.append(res)
187 1
        return results
188
        
189 1
    def _read_history(self, details, rv):
190
        """
191
        read history for a node
192
        """
193 1
        result = ua.HistoryReadResult()
194 1
        if isinstance(details, ua.ReadRawModifiedDetails):
195 1
            if details.IsReadModified:
196
                result.HistoryData = ua.HistoryModifiedData()
197
                # we do not support modified history by design so we return what we have
198
            else:
199 1
                result.HistoryData = ua.HistoryData()
200 1
            dv, cont = self._read_datavalue_history(rv, details)
201 1
            result.HistoryData.DataValues = dv
202 1
            result.ContinuationPoint = cont
203
204
        elif isinstance(details, ua.ReadEventDetails):
205
            result.HistoryData = ua.HistoryEvent()
206
            # FIXME: filter is a cumbersome type, maybe transform it something easier
207
            # to handle for storage
208
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
209
                                                                        details.EndTime,
210
                                                                        details.Filter)
211
        else:
212
            # we do not currently support the other types, clients can process data themselves
213
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
214 1
        return result
215
216 1
    def _read_datavalue_history(self, rv, details):
217 1
        starttime = details.StartTime
218 1
        if rv.ContinuationPoint:
219
            # Spec says we should ignore details if cont point is present
220
            # but they also say we can use cont point as timestamp to enable stateless
221
            # implementation. This is contradictory, so we assume details is
222
            # send correctly with continuation point
223
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
224
            starttime = ua.unpack_datetime(rv.ContinuationPoint)
225
226 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
227
                                                  starttime,
228
                                                  details.EndTime,
229
                                                  details.NumValuesPerNode)
230 1
        if cont:
231
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
232 1
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
233
        # FIXME, parse index range and filter out if necessary
234
        # rv.IndexRange
235
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
236 1
        return dv, cont
237
238 1
    def update_history(self, params):
239
        """
240
        Update history for a node
241
        This is the part AttributeService, but implemented as its own service
242
        since it requires more logic than other attribute service methods
243
        """
244
        results = []
245
        for _ in params.HistoryUpdateDetails:
246
            result = ua.HistoryUpdateResult()
247
            # we do not accept to rewrite history
248
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
249
            results.append(results)
250
        return results
251
252 1
    def stop(self):
253
        self.storage.stop()
254