Completed
Pull Request — master (#180)
by
unknown
03:21
created

HistoryManager.historize_data_change()   A

Complexity

Conditions 3

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175
Metric Value
cc 3
dl 0
loc 11
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8 1
from opcua.common import subscription
9
10
11 1
class HistoryStorageInterface(object):
12
13
    """
14
    Interface of a history backend.
15
    Must be implemented by backends
16
    """
17
18 1
    def new_historized_node(self, node_id, period, count=0):
19
        """
20
        Called when a new node is to be historized
21
        Returns None
22
        """
23
        raise NotImplementedError
24
25 1
    def save_node_value(self, node_id, datavalue):
26
        """
27
        Called when the value of a historized node has changed and should be saved in history
28
        Returns None
29
        """
30
        raise NotImplementedError
31
32 1
    def read_node_history(self, node_id, start, end, nb_values):
33
        """
34
        Called when a client make a history read request for a node
35
        if start or end is missing then nb_values is used to limit query
36
        nb_values is the max number of values to read. Ignored if 0
37
        Start time and end time are inclusive
38
        Returns a list of DataValues and a continuation point which
39
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
40
        """
41
        raise NotImplementedError
42
43 1
    def new_historized_event(self, source_id, etype, period):
44
        """
45
        Called when historization of events is enabled on server side
46
        FIXME: we may need to store events per nodes in future...
47
        Returns None
48
        """
49
        raise NotImplementedError
50
51 1
    def save_event(self, event):
52
        """
53
        Called when a new event has been generated ans should be saved in history
54
        Returns None
55
        """
56
        raise NotImplementedError
57
58 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
59
        """
60
        Called when a client make a history read request for events
61
        Start time and end time are inclusive
62
        Returns a list of Events and a continuation point which
63
        is None if all events are read or the ServerTimeStamp of the last rejected event
64
        """
65
        raise NotImplementedError
66
67 1
    def stop(self):
68
        """
69
        Called when the server shuts down
70
        Can be used to close database connections etc.
71
        """
72
        raise NotImplementedError
73
74
75 1
class HistoryDict(HistoryStorageInterface):
76
    """
77
    very minimal history backend storing data in memory using a Python dictionary
78
    """
79 1
    def __init__(self):
80 1
        self._datachanges = {}
81 1
        self._datachanges_period = {}
82 1
        self._events = {}
83
84 1
    def new_historized_node(self, node_id, period, count=0):
85 1
        self._datachanges[node_id] = []
86 1
        self._datachanges_period[node_id] = period, count
87
88 1
    def save_node_value(self, node_id, datavalue):
89 1
        data = self._datachanges[node_id]
90 1
        period, count = self._datachanges_period[node_id]
91 1
        data.append(datavalue)
92 1
        now = datetime.utcnow()
93 1
        if period:
94
            while now - data[0].ServerTimestamp > period:
95
                data.pop(0)
96 1
        if count and len(data) > count:
97 1
            data = data[-count:]
98
99 1
    def read_node_history(self, node_id, start, end, nb_values):
100 1
        cont = None
101 1
        if node_id not in self._datachanges:
102
            print("Error attempt to read history for a node which is not historized")
103
            return [], cont
104
        else:
105 1
            if start is None:
106
                start = ua.DateTimeMinValue
107 1
            if end is None:
108
                end = ua.DateTimeMinValue
109 1
            if start == ua.DateTimeMinValue:
110 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
111 1
            elif end == ua.DateTimeMinValue:
112 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
113 1
            elif start > end:
114 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
115
116
            else:
117 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
118 1
            if nb_values and len(results) > nb_values:
119 1
                cont = results[nb_values + 1].ServerTimestamp
120 1
                results = results[:nb_values]
121 1
            return results, cont
122
123 1
    def new_historized_event(self, source_id, etype, period):
124 1
        self._events = []
125
126 1
    def save_event(self, event):
127
        raise NotImplementedError
128
129 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
130
        raise NotImplementedError
131
132 1
    def stop(self):
133 1
        pass
134
135
136 1
class SubHandler(object):
137 1
    def __init__(self, storage):
138 1
        self.storage = storage
139
140 1
    def datachange_notification(self, node, val, data):
141 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
142
143 1
    def event_notification(self, event):
144 1
        self.storage.save_event(event)
145
146
147 1
class HistoryManager(object):
148 1
    def __init__(self, iserver):
149 1
        self.logger = logging.getLogger(__name__)
150 1
        self.iserver = iserver
151 1
        self.storage = HistoryDict()
152 1
        self._sub = None
153 1
        self._handlers = {}
154
155 1
    def set_storage(self, storage):
156
        """
157
        set the desired HistoryStorageInterface which History Manager will use for historizing
158
        """
159 1
        self.storage = storage
160
161 1
    def _create_subscription(self, handler):
162 1
        params = ua.CreateSubscriptionParameters()
163 1
        params.RequestedPublishingInterval = 10
164 1
        params.RequestedLifetimeCount = 3000
165 1
        params.RequestedMaxKeepAliveCount = 10000
166 1
        params.MaxNotificationsPerPublish = 0
167 1
        params.PublishingEnabled = True
168 1
        params.Priority = 0
169 1
        return Subscription(self.iserver.isession, params, handler)
170
171 1
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
172
        """
173
        subscribe to the nodes' data changes and store the data in the active storage
174
        """
175 1
        if not self._sub:
176 1
            self._sub = self._create_subscription(SubHandler(self.storage))
177 1
        if node in self._handlers:
178
            raise ua.UaError("Node {} is already historized".format(node))
179 1
        self.storage.new_historized_node(node.nodeid, period, count)
180 1
        handler = self._sub.subscribe_data_change(node)
181 1
        self._handlers[node] = handler
182
183 1
    def historize_event(self, source, period=timedelta(days=7)):
184
        """
185
        subscribe to the source nodes' events and store the data in the active storage; custom event properties included
186
        """
187 1
        if not self._sub:
188 1
            self._sub = self._create_subscription(SubHandler(self.storage))
189 1
        if source in self._handlers:
190
            raise ua.UaError("Events from {} are already historized".format(source))
191
192
        # get the event types the source node generates and a list of all possible event fields
193 1
        event_types, ev_fields = self._get_source_event_data(source)
194
195 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period)
196
197 1
        handler = self._sub.subscribe_events(source)  # FIXME supply list of event types when master is fixed
198 1
        self._handlers[source] = handler
199
200 1
    def dehistorize(self, node):
201
        """
202
        remove subscription to the node/source which is being historized
203
        """
204 1
        if node in self._handlers:
205 1
            self._sub.unsubscribe(self._handlers[node])
206 1
            del(self._handlers[node])
207
        else:
208
            self.logger.error("History Manager isn't subscribed to %s", node)
209
210 1
    def read_history(self, params):
211
        """
212
        Read history for a node
213
        This is the part AttributeService, but implemented as its own service
214
        since it requires more logic than other attribute service methods
215
        """
216 1
        results = []
217
        
218 1
        for rv in params.NodesToRead:
219 1
            res = self._read_history(params.HistoryReadDetails, rv)
220 1
            results.append(res)
221 1
        return results
222
        
223 1
    def _read_history(self, details, rv):
224
        """
225
        determine if the history read is for a data changes or events; then read the history for that node
226
        """
227 1
        result = ua.HistoryReadResult()
228 1
        if isinstance(details, ua.ReadRawModifiedDetails):
229 1
            if details.IsReadModified:
230
                result.HistoryData = ua.HistoryModifiedData()
231
                # we do not support modified history by design so we return what we have
232
            else:
233 1
                result.HistoryData = ua.HistoryData()
234 1
            dv, cont = self._read_datavalue_history(rv, details)
235 1
            result.HistoryData.DataValues = dv
236 1
            result.ContinuationPoint = cont
237
238 1
        elif isinstance(details, ua.ReadEventDetails):
239 1
            result.HistoryData = ua.HistoryEvent()
240
            # FIXME: filter is a cumbersome type, maybe transform it something easier
241
            # to handle for storage
242 1
            ev, cont = self._read_event_history(rv, details)
243 1
            result.HistoryData.Events = ev
244 1
            result.ContinuationPoint = cont
245
246
        else:
247
            # we do not currently support the other types, clients can process data themselves
248
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
249 1
        return result
250
251 1
    def _read_datavalue_history(self, rv, details):
252 1
        starttime = details.StartTime
253 1
        if rv.ContinuationPoint:
254
            # Spec says we should ignore details if cont point is present
255
            # but they also say we can use cont point as timestamp to enable stateless
256
            # implementation. This is contradictory, so we assume details is
257
            # send correctly with continuation point
258
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
259
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
260
261 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
262
                                                  starttime,
263
                                                  details.EndTime,
264
                                                  details.NumValuesPerNode)
265 1
        if cont:
266
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
267 1
            cont = ua.pack_datetime(cont)
268
        # rv.IndexRange
269
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
270 1
        return dv, cont
271
272 1
    def _read_event_history(self, rv, details):
273 1
        starttime = details.StartTime
274 1
        if rv.ContinuationPoint:
275
            # Spec says we should ignore details if cont point is present
276
            # but they also say we can use cont point as timestamp to enable stateless
277
            # implementation. This is contradictory, so we assume details is
278
            # send correctly with continuation point
279
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
280
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
281
282 1
        ev, cont = self.storage.read_event_history(rv.NodeId,
283
                                                   starttime,
284
                                                   details.EndTime,
285
                                                   details.NumValuesPerNode,
286
                                                   details.Filter)
287 1
        if cont:
288
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
289 1
            cont = ua.pack_datetime(cont)
290 1
        return ev, cont
291
292 1
    def _get_source_event_data(self, source):
293
        # get all event types which the source node can generate; get the fields of those event types
294 1
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
295
296 1
        ev_aggregate_fields = []
297 1
        for event_type in event_types:
298 1
            ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
299
300 1
        ev_fields = []
301 1
        for field in set(ev_aggregate_fields):
302 1
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
303 1
        return event_types, ev_fields
304
305 1
    def update_history(self, params):
306
        """
307
        Update history for a node
308
        This is the part AttributeService, but implemented as its own service
309
        since it requires more logic than other attribute service methods
310
        """
311
        results = []
312
        for _ in params.HistoryUpdateDetails:
313
            result = ua.HistoryUpdateResult()
314
            # we do not accept to rewrite history
315
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
316
            results.append(results)
317
        return results
318
319 1
    def stop(self):
320
        """
321
        call stop methods of active storage interface whenever the server is stopped
322
        """
323
        self.storage.stop()
324