Completed
Push — master ( 15ed61...18100b )
by Olivier
03:06 queued 35s
created

opcua.server.HistoryDict   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 51
Duplicated Lines 0 %

Test Coverage

Coverage 80%
Metric Value
dl 0
loc 51
ccs 32
cts 40
cp 0.8
rs 10
wmc 19

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 4 1
A read_event_history() 0 2 1
A new_historized_node() 0 3 1
C read_node_history() 0 18 9
B save_node_value() 0 10 5
A save_event() 0 2 1
A new_historized_event() 0 2 1
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6
7
8 1
class HistoryStorageInterface(object):
9
10
    """
11
    Interface of a history backend.
12
    Must be implemented by backends
13
    """
14
15 1
    def new_historized_node(self, node, period, count=0):
16
        """
17
        Called when a new node is to be historized
18
        Returns None
19
        """
20
        raise NotImplementedError
21
22 1
    def save_node_value(self, node, datavalue):
23
        """
24
        Called when the value of a historized node has changed and should be saved in history
25
        Returns None
26
        """
27
        raise NotImplementedError
28
29 1
    def read_node_history(self, node, start, end, nb_values):
30
        """
31
        Called when a client make a history read request for a node
32
        if start or end is missing then nb_values is used to limit query
33
        nb_values is the max number of values to read. Ignored if 0
34
        Start time and end time are inclusive
35
        Returns a list of DataValues and a continuation point which
36
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
37
        """
38
        raise NotImplementedError
39
40 1
    def new_historized_event(self, event, period):
41
        """
42
        Called when historization of events is enabled on server side
43
        FIXME: we may need to store events per nodes in future...
44
        Returns None
45
        """
46
        raise NotImplementedError
47
48 1
    def save_event(self, event):
49
        """
50
        Called when a new event has been generated ans should be saved in history
51
        Returns None
52
        """
53
        raise NotImplementedError
54
55 1
    def read_event_history(self, start, end, evfilter):
56
        """
57
        Called when a client make a history read request for events
58
        Start time and end time are inclusive
59
        Returns a list of Events and a continuation point which
60
        is None if all events are read or the ServerTimeStamp of the last rejected event
61
        """
62
        raise NotImplementedError
63
64
65 1
class HistoryDict(HistoryStorageInterface):
66
    """
67
    very minimal history backend storing data in memory using a Python dictionnary
68
    """
69 1
    def __init__(self):
70 1
        self._datachanges = {}
71 1
        self._datachanges_period = {}
72 1
        self._events = {}
73
74 1
    def new_historized_node(self, node, period, count=0):
75 1
        self._datachanges[node] = []
76 1
        self._datachanges_period[node] = period, count
77
78 1
    def new_historized_event(self, event, period):
79
        self._events = []
80
81 1
    def save_node_value(self, node, datavalue):
82 1
        data = self._datachanges[node]
83 1
        period, count = self._datachanges_period[node]
84 1
        data.append(datavalue)
85 1
        now = datetime.now()
86 1
        if period:
87 1
            while now - data[0].ServerTimestamp > period:
88
                data.pop(0)
89 1
        if count and len(data) > count:
90 1
            data = data[-count:]
91
92 1
    def read_node_history(self, node, start, end, nb_values):
93 1
        cont = None
94 1
        if node not in self._datachanges:
95
            print("Error attempt to read history for a node which is not historized")
96
            return [], cont
97
        else:
98 1
            if end is None:
99
                end = datetime.now() + timedelta(days=1)
100 1
            if start is None:
101
                start = ua.DateTimeMinValue
102 1
            results = [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end]
103 1
            if nb_values:
104 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
105 1
                    cont = results[nb_values + 1].ServerTimestamp
106 1
                    results = results[:nb_values]
107
                else:
108 1
                    results = results[-nb_values:]
109 1
            return results, cont
110
111 1
    def save_event(self, event):
112
        raise NotImplementedError
113
114 1
    def read_event_history(self, start, end, evfilter):
115
        raise NotImplementedError
116
117
118 1
class SubHandler(object):
119 1
    def __init__(self, storage):
120 1
        self.storage = storage
121
122 1
    def datachange_notification(self, node, val, data):
123 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
124
125
126 1
    def event_notification(self, event):
127
        self.storage.save_event(event)
128
129
130 1
class HistoryManager(object):
131 1
    def __init__(self, iserver):
132 1
        self.iserver = iserver
133 1
        self.storage = HistoryDict()
134 1
        self._sub = None
135 1
        self._handlers = {}
136
137 1
    def set_storage(self, storage):
138
        self.storage = storage
139
140 1
    def _create_subscription(self, handler):
141 1
        params = ua.CreateSubscriptionParameters()
142 1
        params.RequestedPublishingInterval = 10
143 1
        params.RequestedLifetimeCount = 3000
144 1
        params.RequestedMaxKeepAliveCount = 10000
145 1
        params.MaxNotificationsPerPublish = 0
146 1
        params.PublishingEnabled = True
147 1
        params.Priority = 0
148 1
        return Subscription(self.iserver.isession, params, handler)
149
150 1
    def historize(self, node, period=timedelta(days=7), count=0):
151 1
        if not self._sub:
152 1
            self._sub = self._create_subscription(SubHandler(self.storage))
153 1
        if node in self._handlers:
154
            raise ua.UaError("Node {} is allready historized".format(node))
155 1
        self.storage.new_historized_node(node.nodeid, period, count)
156 1
        handler = self._sub.subscribe_data_change(node)
157 1
        self._handlers[node] = handler
158
159 1
    def dehistorize(self, node):
160 1
        self._sub.unsubscribe(self._handlers[node])
161 1
        del(self._handlers[node])
162
163 1
    def read_history(self, params):
164
        """
165
        Read history for a node
166
        This is the part AttributeService, but implemented as its own service
167
        since it requires more logic than other attribute service methods
168
        """
169 1
        results = []
170
        
171 1
        for rv in params.NodesToRead:
172 1
            res = self._read_history(params.HistoryReadDetails, rv)
173 1
            results.append(res)
174 1
        return results
175
        
176 1
    def _read_history(self, details, rv):
177
        """ read history for a node 
178
        """
179 1
        result = ua.HistoryReadResult()
180 1
        if isinstance(details, ua.ReadRawModifiedDetails):
181 1
            if details.IsReadModified:
182
                result.HistoryData = ua.HistoryModifiedData()
183
                # we do not support modified history by design so we return what we have
184
            else:
185 1
                result.HistoryData = ua.HistoryData()
186 1
            dv, cont = self._read_datavalue_history(rv, details)
187 1
            result.HistoryData.DataValues = dv
188 1
            result.ContinuationPoint = cont
189
190
        elif isinstance(details, ua.ReadEventDetails):
191
            result.HistoryData = ua.HistoryEvent()
192
            # FIXME: filter is a cumbersome type, maybe transform it something easier
193
            # to handle for storage
194
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
195
                                                                        details.EndTime,
196
                                                                        details.Filter)
197
        else:
198
            # we do not currently support the other types, clients can process data themselves
199
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
200 1
        return result
201
202 1
    def _read_datavalue_history(self, rv, details):
203 1
        starttime = details.StartTime
204 1
        if rv.ContinuationPoint:
205
            # Spec says we should ignore details if cont point is present
206
            # but they also say we can use cont point as timestamp to enable stateless
207
            # implementation. This is contradictory, so we assume details is
208
            # send correctly with continuation point
209
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
210
            starttime = ua.unpack_datetime(rv.ContinuationPoint)
211
212 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
213
                                                  starttime,
214
                                                  details.EndTime,
215
                                                  details.NumValuesPerNode)
216 1
        if cont:
217
            #cont = datetime_to_bytes(dv[-1].ServerTimestamp)
218 1
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
219
        # FIXME, parse index range and filter out if necesary
220
        # rv.IndexRange
221
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
222 1
        return dv, cont
223
224 1
    def update_history(self, params):
225
        """
226
        Update history for a node
227
        This is the part AttributeService, but implemented as its own service
228
        since it requires more logic than other attribute service methods
229
        """
230
        results = []
231
        for _ in params.HistoryUpdateDetails:
232
            result = ua.HistoryUpdateResult()
233
            # we do not accept to rewrite history
234
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
235
            results.append(results)
236
        return results
237
238
239
240
241