Completed
Pull Request — master (#189)
by
unknown
03:24
created

HistoryManager.update_history()   A

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2.1481

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
c 2
b 0
f 0
dl 0
loc 13
ccs 2
cts 3
cp 0.6667
crap 2.1481
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8 1
from opcua.common import subscription
9
10
11 1
class HistoryStorageInterface(object):
12
13
    """
14
    Interface of a history backend.
15
    Must be implemented by backends
16
    """
17
18 1
    def new_historized_node(self, node_id, period, count=0):
19
        """
20
        Called when a new node is to be historized
21
        Returns None
22
        """
23
        raise NotImplementedError
24
25 1
    def save_node_value(self, node_id, datavalue):
26
        """
27
        Called when the value of a historized node has changed and should be saved in history
28
        Returns None
29
        """
30
        raise NotImplementedError
31
32 1
    def read_node_history(self, node_id, start, end, nb_values):
33
        """
34
        Called when a client make a history read request for a node
35
        if start or end is missing then nb_values is used to limit query
36
        nb_values is the max number of values to read. Ignored if 0
37
        Start time and end time are inclusive
38
        Returns a list of DataValues and a continuation point which
39
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
40
        """
41
        raise NotImplementedError
42
43 1
    def new_historized_event(self, source_id, etype, period):
44
        """
45
        Called when historization of events is enabled on server side
46
        FIXME: we may need to store events per nodes in future...
47
        Returns None
48
        """
49
        raise NotImplementedError
50
51 1
    def save_event(self, event):
52
        """
53
        Called when a new event has been generated ans should be saved in history
54
        Returns None
55
        """
56
        raise NotImplementedError
57
58 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
59
        """
60
        Called when a client make a history read request for events
61
        Start time and end time are inclusive
62
        Returns a list of Events and a continuation point which
63
        is None if all events are read or the ServerTimeStamp of the last rejected event
64
        """
65
        raise NotImplementedError
66
67 1
    def stop(self):
68
        """
69
        Called when the server shuts down
70
        Can be used to close database connections etc.
71
        """
72
        raise NotImplementedError
73
74
75 1
class HistoryDict(HistoryStorageInterface):
76
    """
77
    very minimal history backend storing data in memory using a Python dictionary
78
    """
79 1
    def __init__(self):
80 1
        self._datachanges = {}
81 1
        self._datachanges_period = {}
82 1
        self._events = {}
83
84 1
    def new_historized_node(self, node_id, period, count=0):
85 1
        self._datachanges[node_id] = []
86 1
        self._datachanges_period[node_id] = period, count
87
88 1
    def save_node_value(self, node_id, datavalue):
89 1
        data = self._datachanges[node_id]
90 1
        period, count = self._datachanges_period[node_id]
91 1
        data.append(datavalue)
92 1
        now = datetime.utcnow()
93 1
        if period:
94 1
            while len(data) and now - data[0].ServerTimestamp > period:
95
                data.pop(0)
96 1
        if count and len(data) > count:
97 1
            data.pop(0)
98
99 1
    def read_node_history(self, node_id, start, end, nb_values):
100 1
        cont = None
101 1
        if node_id not in self._datachanges:
102
            print("Error attempt to read history for a node which is not historized")
103
            return [], cont
104
        else:
105 1
            if start is None:
106
                start = ua.DateTimeMinValue
107 1
            if end is None:
108
                end = ua.DateTimeMinValue
109 1
            if start == ua.DateTimeMinValue:
110 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
111 1
            elif end == ua.DateTimeMinValue:
112 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
113 1
            elif start > end:
114 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
115
116
            else:
117 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
118 1
            if nb_values and len(results) > nb_values:
119 1
                cont = results[nb_values + 1].ServerTimestamp
120 1
                results = results[:nb_values]
121 1
            return results, cont
122
123 1
    def new_historized_event(self, source_id, etype, period):
124 1
        self._events = []
125
126 1
    def save_event(self, event):
127
        raise NotImplementedError
128
129 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
130
        raise NotImplementedError
131
132 1
    def stop(self):
133 1
        pass
134
135
136 1
class SubHandler(object):
137 1
    def __init__(self, storage):
138 1
        self.storage = storage
139
140 1
    def datachange_notification(self, node, val, data):
141 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
142
143 1
    def event_notification(self, event):
144 1
        self.storage.save_event(event)
145
146
147 1
class HistoryManager(object):
148 1
    def __init__(self, iserver):
149 1
        self.logger = logging.getLogger(__name__)
150 1
        self.iserver = iserver
151 1
        self.storage = HistoryDict()
152 1
        self._sub = None
153 1
        self._handlers = {}
154
155 1
    def set_storage(self, storage):
156
        """
157
        set the desired HistoryStorageInterface which History Manager will use for historizing
158
        """
159 1
        self.storage = storage
160
161 1
    def _create_subscription(self, handler):
162 1
        params = ua.CreateSubscriptionParameters()
163 1
        params.RequestedPublishingInterval = 10
164 1
        params.RequestedLifetimeCount = 3000
165 1
        params.RequestedMaxKeepAliveCount = 10000
166 1
        params.MaxNotificationsPerPublish = 0
167 1
        params.PublishingEnabled = True
168 1
        params.Priority = 0
169 1
        return Subscription(self.iserver.isession, params, handler)
170
171 1
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
172
        """
173
        Subscribe to the nodes' data changes and store the data in the active storage.
174
        """
175 1
        if not self._sub:
176 1
            self._sub = self._create_subscription(SubHandler(self.storage))
177 1
        if node in self._handlers:
178
            raise ua.UaError("Node {} is already historized".format(node))
179 1
        self.storage.new_historized_node(node.nodeid, period, count)
180 1
        handler = self._sub.subscribe_data_change(node)
181 1
        self._handlers[node] = handler
182
183 1
    def historize_event(self, source, period=timedelta(days=7)):
184
        """
185
        Subscribe to the source nodes' events and store the data in the active storage.
186
187 1
        SQL Implementation
188 1
        The default is to historize every event type the source generates, custom event properties are included. At
189 1
        this time there is no way to historize a specific event type. The user software can filter out events which are
190
        not desired when reading.
191
192
        Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
193 1
        time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
194
        must be deleted manually so that a new table with the custom event fields can be created.
195 1
        """
196
        if not self._sub:
197 1
            self._sub = self._create_subscription(SubHandler(self.storage))
198 1
        if source in self._handlers:
199
            raise ua.UaError("Events from {} are already historized".format(source))
200 1
201
        # get the event types the source node generates and a list of all possible event fields
202
        event_types, ev_fields = self._get_source_event_data(source)
203
204 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period)
205 1
206 1
        handler = self._sub.subscribe_events(source, event_types)
207
        self._handlers[source] = handler
208
209
    def dehistorize(self, node):
210 1
        """
211
        Remove subscription to the node/source which is being historized
212
213
        SQL Implementation
214
        Only the subscriptions is removed. The historical data remains.
215
        """
216 1
        if node in self._handlers:
217
            self._sub.unsubscribe(self._handlers[node])
218 1
            del(self._handlers[node])
219 1
        else:
220 1
            self.logger.error("History Manager isn't subscribed to %s", node)
221 1
222
    def read_history(self, params):
223 1
        """
224
        Read history for a node
225
        This is the part AttributeService, but implemented as its own service
226
        since it requires more logic than other attribute service methods
227 1
        """
228 1
        results = []
229 1
230
        for rv in params.NodesToRead:
231
            res = self._read_history(params.HistoryReadDetails, rv)
232
            results.append(res)
233 1
        return results
234 1
235 1
    def _read_history(self, details, rv):
236 1
        """
237
        determine if the history read is for a data changes or events; then read the history for that node
238 1
        """
239 1
        result = ua.HistoryReadResult()
240
        if isinstance(details, ua.ReadRawModifiedDetails):
241
            if details.IsReadModified:
242 1
                result.HistoryData = ua.HistoryModifiedData()
243 1
                # we do not support modified history by design so we return what we have
244 1
            else:
245
                result.HistoryData = ua.HistoryData()
246
            dv, cont = self._read_datavalue_history(rv, details)
247
            result.HistoryData.DataValues = dv
248
            result.ContinuationPoint = cont
249 1
250
        elif isinstance(details, ua.ReadEventDetails):
251 1
            result.HistoryData = ua.HistoryEvent()
252 1
            # FIXME: filter is a cumbersome type, maybe transform it something easier
253 1
            # to handle for storage
254
            ev, cont = self._read_event_history(rv, details)
255
            result.HistoryData.Events = ev
256
            result.ContinuationPoint = cont
257
258
        else:
259
            # we do not currently support the other types, clients can process data themselves
260
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
261 1
        return result
262
263
    def _read_datavalue_history(self, rv, details):
264
        starttime = details.StartTime
265 1
        if rv.ContinuationPoint:
266
            # Spec says we should ignore details if cont point is present
267 1
            # but they also say we can use cont point as timestamp to enable stateless
268
            # implementation. This is contradictory, so we assume details is
269
            # send correctly with continuation point
270 1
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
271
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
272 1
273 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
274 1
                                                  starttime,
275
                                                  details.EndTime,
276
                                                  details.NumValuesPerNode)
277
        if cont:
278
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
279
            cont = ua.pack_datetime(cont)
280
        # rv.IndexRange
281
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
282 1
        return dv, cont
283
284
    def _read_event_history(self, rv, details):
285
        starttime = details.StartTime
286
        if rv.ContinuationPoint:
287 1
            # Spec says we should ignore details if cont point is present
288
            # but they also say we can use cont point as timestamp to enable stateless
289 1
            # implementation. This is contradictory, so we assume details is
290 1
            # send correctly with continuation point
291
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
292 1
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
293
294 1
        ev, cont = self.storage.read_event_history(rv.NodeId,
295
                                                   starttime,
296 1
                                                   details.EndTime,
297 1
                                                   details.NumValuesPerNode,
298 1
                                                   details.Filter)
299
        if cont:
300 1
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
301 1
            cont = ua.pack_datetime(cont)
302 1
        return ev, cont
303 1
304
    def _get_source_event_data(self, source):
305 1
        # get all event types which the source node can generate; get the fields of those event types
306
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
307
308
        ev_aggregate_fields = []
309
        for event_type in event_types:
310
            ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
311
312
        ev_fields = []
313
        for field in set(ev_aggregate_fields):
314
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
315
        return event_types, ev_fields
316
317
    def update_history(self, params):
318
        """
319 1
        Update history for a node
320
        This is the part AttributeService, but implemented as its own service
321
        since it requires more logic than other attribute service methods
322
        """
323 1
        results = []
324
        for _ in params.HistoryUpdateDetails:
325
            result = ua.HistoryUpdateResult()
326
            # we do not accept to rewrite history
327
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
328
            results.append(results)
329
        return results
330
331
    def stop(self):
332
        """
333
        call stop methods of active storage interface whenever the server is stopped
334
        """
335
        self.storage.stop()
336