Completed
Pull Request — master (#140)
by Olivier
02:36
created

opcua.server.HistoryManager.historize()   A

Complexity

Conditions 3

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175
Metric Value
cc 3
dl 0
loc 8
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 9.4285
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6
7
8 1
class HistoryStorageInterface(object):
9
    """
10
    Interface of a history backend.
11
    Must be implemented by backends
12
    """
13 1
    def new_historized_node(self, node, period, count=0):
14
        """
15
        Called when a new node is to be historized
16
        """
17
        raise NotImplementedError
18
19 1
    def save_node_value(self, node, datavalue):
20
        """
21
        Called when the value of a historized node has changed and should be saved in history
22
        """
23
        raise NotImplementedError
24
25 1
    def read_node_history(self, node, start, end, nb_values):
26
        """
27
        Called when a client make a history read request for a node
28
        if start or end is missing then nb_values is used to limit query
29
        nb_values is the max number of values to read. Ignored if 0
30
        """
31
        raise NotImplementedError
32
33 1
    def new_historized_event(self, event, period):
34
        """
35
        Called when historization of events is enabled on server side
36
        FIXME: we may need to store events per nodes in future...
37
        """
38
        raise NotImplementedError
39
40 1
    def save_event(self, event):
41
        """
42
        Called when a new event has been generated ans should be saved in history
43
        """
44
        raise NotImplementedError
45
46 1
    def read_event_history(self, start, end, evfilter):
47
        """
48
        Called when a client make a history read request for events
49
        """
50
        raise NotImplementedError
51
52
53 1
class HistoryDict(HistoryStorageInterface):
54
    """
55
    very minimal history backend storing data in memory using a Python dictionnary
56
    """
57 1
    def __init__(self):
58 1
        self._datachanges = {}
59 1
        self._datachanges_period = {}
60 1
        self._events = {}
61
62 1
    def new_historized_node(self, node, period, count=0):
63 1
        self._datachanges[node] = []
64 1
        self._datachanges_period[node] = period, count
65
66 1
    def new_historized_event(self, event, period):
67
        self._events = []
68
69 1
    def save_node_value(self, node, datavalue):
70 1
        data = self._datachanges[node]
71 1
        period, count = self._datachanges_period[node]
72 1
        data.append(datavalue)
73 1
        now = datetime.now()
74 1
        if period:
75 1
            while now - data[0].ServerTimestamp > period:
76
                data.pop(0)
77 1
        if count and len(data) > count:
78 1
            data = data[-count:]
79
80 1
    def read_node_history(self, node, start, end, nb_values):
81 1
        cont = None
82 1
        if node not in self._datachanges:
83
            print("Error attempt to read history for a node which is not historized")
84
            return [], cont
85
        else:
86 1
            if end is None:
87
                end = datetime.now() + timedelta(days=1)
88 1
            if start is None:
89
                start = ua.DateTimeMinValue
90 1
            results = [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end]
91 1
            if nb_values:
92 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
93 1
                    results = results[:nb_values]
94 1
                    cont = results[-1].ServerTimestamp
95
                else:
96 1
                    results = results[-nb_values:]
97 1
            return results, cont
98
99 1
    def save_event(self, event):
100
        raise NotImplementedError
101
102 1
    def read_event_history(self, start, end, evfilter):
103
        raise NotImplementedError
104
105
106 1
class SubHandler(object):
107 1
    def __init__(self, storage):
108 1
        self.storage = storage
109
110 1
    def datachange_notification(self, node, val, data):
111 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
112
113
114 1
    def event_notification(self, event):
115
        self.storage.save_event(event)
116
117
118 1
class HistoryManager(object):
119 1
    def __init__(self, iserver):
120 1
        self.iserver = iserver
121 1
        self.storage = HistoryDict()
122 1
        self._sub = None
123 1
        self._handlers = {}
124
125 1
    def set_storage(self, storage):
126
        self.storage = storage
127
128 1
    def _create_subscription(self, handler):
129 1
        params = ua.CreateSubscriptionParameters()
130 1
        params.RequestedPublishingInterval = 10
131 1
        params.RequestedLifetimeCount = 3000
132 1
        params.RequestedMaxKeepAliveCount = 10000
133 1
        params.MaxNotificationsPerPublish = 0
134 1
        params.PublishingEnabled = True
135 1
        params.Priority = 0
136 1
        return Subscription(self.iserver.isession, params, handler)
137
138 1
    def historize(self, node, period=timedelta(days=7), count=0):
139 1
        if not self._sub:
140 1
            self._sub = self._create_subscription(SubHandler(self.storage))
141 1
        if node in self._handlers:
142
            raise ua.UaError("Node {} is allready historized".format(node))
143 1
        self.storage.new_historized_node(node.nodeid, period, count)
144 1
        handler = self._sub.subscribe_data_change(node)
145 1
        self._handlers[node] = handler
146
147 1
    def dehistorize(self, node):
148 1
        self._sub.unsubscribe(self._handlers[node])
149 1
        del(self._handlers[node])
150
151 1
    def read_history(self, params):
152
        """
153
        Read history for a node
154
        This is the part AttributeService, but implemented as its own service
155
        since it requires more logic than other attribute service methods
156
        """
157 1
        results = []
158
        
159 1
        for rv in params.NodesToRead:
160 1
            res = self._read_history(params.HistoryReadDetails, rv)
161 1
            results.append(res)
162 1
        return results
163
        
164 1
    def _read_history(self, details, rv):
165
        """ read history for a node 
166
        """
167 1
        result = ua.HistoryReadResult()
168 1
        if isinstance(details, ua.ReadRawModifiedDetails):
169 1
            if details.IsReadModified:
170
                result.HistoryData = ua.HistoryModifiedData()
171
                # we do not support modified history by design so we return what we have
172
            else:
173 1
                result.HistoryData = ua.HistoryData()
174 1
            dv, cont = self._read_datavalue_history(rv, details)
175 1
            result.HistoryData.DataValues = dv
176 1
            result.ContinuationPoint = cont
177
178
        elif isinstance(details, ua.ReadEventDetails):
179
            result.HistoryData = ua.HistoryEvent()
180
            # FIXME: filter is a cumbersome type, maybe transform it something easier
181
            # to handle for storage
182
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
183
                                                                        details.EndTime,
184
                                                                        details.Filter)
185
        else:
186
            # we do not currently support the other types, clients can process data themselves
187
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
188 1
        return result
189
190 1
    def _read_datavalue_history(self, rv, details):
191 1
        starttime = details.StartTime
192 1
        if rv.ContinuationPoint:
193
            # Spec says we should ignore details if cont point is present
194
            # but they also say we can use cont point as timestamp to enable stateless
195
            # implementation. This is contradictory, so we assume details is
196
            # send correctly with continuation point
197
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
198
            starttime = ua.unpack_datetime(rv.ContinuationPoint)
199
200 1
        dv, cont = self.storage.read_node_history(rv.NodeId,
201
                                                  starttime,
202
                                                  details.EndTime,
203
                                                  details.NumValuesPerNode)
204 1
        if cont:
205
            #cont = datetime_to_bytes(dv[-1].ServerTimestamp)
206 1
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
207
        # FIXME, parse index range and filter out if necesary
208
        # rv.IndexRange
209
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
210 1
        return dv, cont
211
212 1
    def update_history(self, params):
213
        """
214
        Update history for a node
215
        This is the part AttributeService, but implemented as its own service
216
        since it requires more logic than other attribute service methods
217
        """
218
        results = []
219
        for _ in params.HistoryUpdateDetails:
220
            result = ua.HistoryUpdateResult()
221
            # we do not accept to rewrite history
222
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
223
            results.append(results)
224
        return results
225
226
227
228
229