Completed
Push — master ( 175840...68f055 )
by Olivier
03:53 queued 27s
created

HistoryDict.read_event_history()   F

Complexity

Conditions 17

Size

Total Lines 23

Duplication

Lines 23
Ratio 100 %

Code Coverage

Tests 17
CRAP Score 17

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 17
c 2
b 0
f 0
dl 23
loc 23
ccs 17
cts 17
cp 1
crap 17
rs 3.0143

How to fix   Complexity   

Complexity

Complex classes like HistoryDict.read_event_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4
5 1
from opcua import Subscription
6 1
from opcua import ua
7 1
from opcua.common import utils
8 1
from opcua.common import events
9
10
11 1
class UaNodeAlreadyHistorizedError(ua.UaError):
12
    pass
13
14
15
class HistoryStorageInterface(object):
16
17
    """
18 1
    Interface of a history backend.
19
    Must be implemented by backends
20
    """
21
22
    def new_historized_node(self, node_id, period, count=0):
23
        """
24
        Called when a new node is to be historized
25 1
        Returns None
26
        """
27
        raise NotImplementedError
28
29
    def save_node_value(self, node_id, datavalue):
30
        """
31
        Called when the value of a historized node has changed and should be saved in history
32 1
        Returns None
33
        """
34
        raise NotImplementedError
35
36
    def read_node_history(self, node_id, start, end, nb_values):
37
        """
38
        Called when a client make a history read request for a node
39
        if start or end is missing then nb_values is used to limit query
40
        nb_values is the max number of values to read. Ignored if 0
41
        Start time and end time are inclusive
42
        Returns a list of DataValues and a continuation point which
43 1
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
44
        """
45
        raise NotImplementedError
46
47
    def new_historized_event(self, source_id, etype, period, count=0):
48
        """
49
        Called when historization of events is enabled on server side
50
        Returns None
51 1
        """
52
        raise NotImplementedError
53
54
    def save_event(self, event):
55
        """
56
        Called when a new event has been generated ans should be saved in history
57
        Returns None
58 1
        """
59
        raise NotImplementedError
60
61
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
62
        """
63
        Called when a client make a history read request for events
64
        Start time and end time are inclusive
65
        Returns a list of Events and a continuation point which
66
        is None if all events are read or the ServerTimeStamp of the last rejected event
67 1
        """
68
        raise NotImplementedError
69
70
    def stop(self):
71
        """
72
        Called when the server shuts down
73
        Can be used to close database connections etc.
74
        """
75 1
        raise NotImplementedError
76
77
78
class HistoryDict(HistoryStorageInterface):
79 1
    """
80 1
    very minimal history backend storing data in memory using a Python dictionary
81 1
    """
82 1
    def __init__(self):
83
        self._datachanges = {}
84 1
        self._datachanges_period = {}
85 1
        self._events = {}
86 1
        self._events_periods = {}
87
88 1
    def new_historized_node(self, node_id, period, count=0):
89 1
        if node_id in self._datachanges:
90 1
            raise UaNodeAlreadyHistorizedError(node_id)
91 1
        self._datachanges[node_id] = []
92 1
        self._datachanges_period[node_id] = period, count
93 1
94 1
    def save_node_value(self, node_id, datavalue):
95
        data = self._datachanges[node_id]
96 1
        period, count = self._datachanges_period[node_id]
97 1
        data.append(datavalue)
98
        now = datetime.utcnow()
99 1
        if period:
100 1
            while len(data) and now - data[0].ServerTimestamp > period:
101 1
                data.pop(0)
102
        if count and len(data) > count:
103
            data.pop(0)
104
105 1 View Code Duplication
    def read_node_history(self, node_id, start, end, nb_values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
106
        cont = None
107 1
        if node_id not in self._datachanges:
108
            print("Error attempt to read history for a node which is not historized")
109 1
            return [], cont
110 1
        else:
111 1
            if start is None:
112 1
                start = ua.DateTimeMinValue
113 1
            if end is None:
114 1
                end = ua.DateTimeMinValue
115
            if start == ua.DateTimeMinValue:
116
                results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
117 1
            elif end == ua.DateTimeMinValue:
118 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
119 1
            elif start > end:
120 1
                results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
121 1
122
            else:
123 1
                results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
124 1
            if nb_values and len(results) > nb_values:
125
                cont = results[nb_values + 1].ServerTimestamp
126 1
                results = results[:nb_values]
127
            return results, cont
128
129 1
    def new_historized_event(self, source_id, etype, period, count):
130
        if source_id in self._events:
131
            raise UaNodeAlreadyHistorizedError(source_id)
132 1
        self._events[source_id] = []
133 1
        self._events_periods[source_id] = period, count
134
135
    def save_event(self, event):
136 1
        evts = self._events[event.SourceNode]
137 1
        evts.append(event)
138 1
        period, count = self._events_periods[event.SourceNode]
139
        now = datetime.utcnow()
140 1
        if period:
141 1
            while len(evts) and now - evts[0].ServerTimestamp > period:
142
                evts.pop(0)
143 1
        if count and len(evts) > count:
144 1
            evts.pop(0)
145
146 View Code Duplication
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147 1
        cont = None
148 1
        if source_id not in self._events:
149 1
            print("Error attempt to read event history for a node which does not historize events")
150 1
            return [], cont
151 1
        else:
152 1
            if start is None:
153 1
                start = ua.DateTimeMinValue
154
            if end is None:
155 1
                end = ua.DateTimeMinValue
156
            if start == ua.DateTimeMinValue:
157
                results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
158
            elif end == ua.DateTimeMinValue:
159 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time]
160
            elif start > end:
161 1
                results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
162 1
163 1
            else:
164 1
                results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
165 1
            if nb_values and len(results) > nb_values:
166 1
                cont = results[nb_values + 1].Time
167 1
                results = results[:nb_values]
168 1
            return results, cont
169 1
170
    def stop(self):
171 1
        pass
172
173
174
class SubHandler(object):
175 1
    def __init__(self, storage):
176 1
        self.storage = storage
177 1
178
    def datachange_notification(self, node, val, data):
179 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
180 1
181 1
    def event_notification(self, event):
182
        self.storage.save_event(event)
183 1
184
185
class HistoryManager(object):
186
    def __init__(self, iserver):
187 1
        self.logger = logging.getLogger(__name__)
188 1
        self.iserver = iserver
189 1
        self.storage = HistoryDict()
190
        self._sub = None
191
        self._handlers = {}
192
193 1
    def set_storage(self, storage):
194
        """
195 1
        set the desired HistoryStorageInterface which History Manager will use for historizing
196
        """
197 1
        self.storage = storage
198 1
199
    def _create_subscription(self, handler):
200 1
        params = ua.CreateSubscriptionParameters()
201
        params.RequestedPublishingInterval = 10
202
        params.RequestedLifetimeCount = 3000
203
        params.RequestedMaxKeepAliveCount = 10000
204 1
        params.MaxNotificationsPerPublish = 0
205 1
        params.PublishingEnabled = True
206 1
        params.Priority = 0
207
        return Subscription(self.iserver.isession, params, handler)
208
209
    def historize_data_change(self, node, period=timedelta(days=7), count=0):
210 1
        """
211
        subscribe to the nodes' data changes and store the data in the active storage
212
        """
213
        if not self._sub:
214
            self._sub = self._create_subscription(SubHandler(self.storage))
215
        if node in self._handlers:
216 1
            raise ua.UaError("Node {} is already historized".format(node))
217
        self.storage.new_historized_node(node.nodeid, period, count)
218 1
        handler = self._sub.subscribe_data_change(node)
219 1
        self._handlers[node] = handler
220 1
221 1
    def historize_event(self, source, period=timedelta(days=7), count=0):
222
        """
223 1
        subscribe to the source nodes' events and store the data in the active storage; custom event properties included
224
        """
225
        if not self._sub:
226
            self._sub = self._create_subscription(SubHandler(self.storage))
227 1
        if source in self._handlers:
228 1
            raise ua.UaError("Events from {} are already historized".format(source))
229 1
230
        # get the event types the source node generates and a list of all possible event fields
231
        event_types, ev_fields = self._get_source_event_data(source)
232
233 1
        self.storage.new_historized_event(source.nodeid, ev_fields, period, count)
234 1
235 1
        handler = self._sub.subscribe_events(source)  # FIXME supply list of event types when master is fixed
236 1
        self._handlers[source] = handler
237
238 1
    def dehistorize(self, node):
239 1
        """
240
        remove subscription to the node/source which is being historized
241
        """
242 1
        if node in self._handlers:
243 1
            self._sub.unsubscribe(self._handlers[node])
244 1
            del(self._handlers[node])
245
        else:
246
            self.logger.error("History Manager isn't subscribed to %s", node)
247
248
    def read_history(self, params):
249 1
        """
250
        Read history for a node
251 1
        This is the part AttributeService, but implemented as its own service
252 1
        since it requires more logic than other attribute service methods
253 1
        """
254
        results = []
255
256
        for rv in params.NodesToRead:
257
            res = self._read_history(params.HistoryReadDetails, rv)
258
            results.append(res)
259
        return results
260
261 1
    def _read_history(self, details, rv):
262
        """
263
        determine if the history read is for a data changes or events; then read the history for that node
264
        """
265 1
        result = ua.HistoryReadResult()
266
        if isinstance(details, ua.ReadRawModifiedDetails):
267 1
            if details.IsReadModified:
268
                result.HistoryData = ua.HistoryModifiedData()
269
                # we do not support modified history by design so we return what we have
270 1
            else:
271
                result.HistoryData = ua.HistoryData()
272 1
            dv, cont = self._read_datavalue_history(rv, details)
273 1
            result.HistoryData.DataValues = dv
274 1
            result.ContinuationPoint = cont
275
276
        elif isinstance(details, ua.ReadEventDetails):
277
            result.HistoryData = ua.HistoryEvent()
278
            # FIXME: filter is a cumbersome type, maybe transform it something easier
279
            # to handle for storage
280
            ev, cont = self._read_event_history(rv, details)
281
            result.HistoryData.Events = ev
282 1
            result.ContinuationPoint = cont
283
284
        else:
285
            # we do not currently support the other types, clients can process data themselves
286
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
287 1
        return result
288
289 1
    def _read_datavalue_history(self, rv, details):
290 1
        starttime = details.StartTime
291
        if rv.ContinuationPoint:
292 1
            # Spec says we should ignore details if cont point is present
293
            # but they also say we can use cont point as timestamp to enable stateless
294 1
            # implementation. This is contradictory, so we assume details is
295
            # send correctly with continuation point
296 1
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
297 1
298 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
299
                                                  starttime,
300 1
                                                  details.EndTime,
301 1
                                                  details.NumValuesPerNode)
302 1
        if cont:
303 1
            cont = ua.pack_datetime(cont)
304
        # rv.IndexRange
305 1
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
306
        return dv, cont
307
308
    def _read_event_history(self, rv, details):
309
        starttime = details.StartTime
310
        if rv.ContinuationPoint:
311
            # Spec says we should ignore details if cont point is present
312
            # but they also say we can use cont point as timestamp to enable stateless
313
            # implementation. This is contradictory, so we assume details is
314
            # send correctly with continuation point
315
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
316
317
        evts, cont = self.storage.read_event_history(rv.NodeId,
318
                                                     starttime,
319 1
                                                     details.EndTime,
320
                                                     details.NumValuesPerNode,
321
                                                     details.Filter)
322
        results = []
323 1
        for ev in evts:
324
            field_list = ua.HistoryEventFieldList()
325
            field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
326
            results.append(field_list)
327
        if cont:
328
            cont = ua.pack_datetime(cont)
329
        return results, cont
330
331
    def _get_source_event_data(self, source):
332
        # get all event types which the source node can generate; get the fields of those event types
333
        event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
334
335
        ev_aggregate_fields = []
336
        for event_type in event_types:
337
            ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
338
339
        ev_fields = []
340
        for field in set(ev_aggregate_fields):
341
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
342
        return event_types, ev_fields
343
344
    def update_history(self, params):
345
        """
346
        Update history for a node
347
        This is the part AttributeService, but implemented as its own service
348
        since it requires more logic than other attribute service methods
349
        """
350
        results = []
351
        for _ in params.HistoryUpdateDetails:
352
            result = ua.HistoryUpdateResult()
353
            # we do not accept to rewrite history
354
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
355
            results.append(results)
356
        return results
357
358
    def stop(self):
359
        """
360
        call stop methods of active storage interface whenever the server is stopped
361
        """
362
        self.storage.stop()
363