Completed
Pull Request — master (#158)
by Olivier
02:32
created

HistoryManager.stop()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2
Metric Value
cc 1
dl 0
loc 2
rs 10
ccs 0
cts 0
cp 0
crap 2
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6 1
from opcua.common import utils
7
8
9 1
class HistoryStorageInterface(object):
10
11
    """
12
    Interface of a history backend.
13
    Must be implemented by backends
14
    """
15
16 1
    def new_historized_node(self, node_id, period, count=0):
17
        """
18
        Called when a new node is to be historized
19
        Returns None
20
        """
21
        raise NotImplementedError
22
23 1
    def save_node_value(self, node_id, datavalue):
24
        """
25
        Called when the value of a historized node has changed and should be saved in history
26
        Returns None
27
        """
28
        raise NotImplementedError
29
30 1
    def read_node_history(self, node_id, start, end, nb_values):
31
        """
32
        Called when a client make a history read request for a node
33
        if start or end is missing then nb_values is used to limit query
34
        nb_values is the max number of values to read. Ignored if 0
35
        Start time and end time are inclusive
36
        Returns a list of DataValues and a continuation point which
37
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
38
        """
39
        raise NotImplementedError
40
41 1
    def new_historized_event(self, event, period):
42
        """
43
        Called when historization of events is enabled on server side
44
        FIXME: we may need to store events per nodes in future...
45
        Returns None
46
        """
47
        raise NotImplementedError
48
49 1
    def save_event(self, event):
50
        """
51
        Called when a new event has been generated ans should be saved in history
52
        Returns None
53
        """
54
        raise NotImplementedError
55
56 1
    def read_event_history(self, start, end, evfilter):
57
        """
58
        Called when a client make a history read request for events
59
        Start time and end time are inclusive
60
        Returns a list of Events and a continuation point which
61
        is None if all events are read or the ServerTimeStamp of the last rejected event
62
        """
63
        raise NotImplementedError
64
65
    def stop(self):
66 1
        """
67
        Called when the server shuts down
68
        Can be used to close database connections etc.
69
        """
70 1
        raise NotImplementedError
71 1
72 1
73 1
# if you want to use an SQL based history uncomment this import and change the storage type of the history manager
74
# from opcua.server.history_sql import HistorySQLite
75 1
76 1
77 1
class HistoryDict(HistoryStorageInterface):
78
    """
79 1
    very minimal history backend storing data in memory using a Python dictionary
80
    """
81
    def __init__(self):
82 1
        self._datachanges = {}
83 1
        self._datachanges_period = {}
84 1
        self._events = {}
85 1
86 1
    def new_historized_node(self, node_id, period, count=0):
87 1
        self._datachanges[node_id] = []
88 1
        self._datachanges_period[node_id] = period, count
89
90 1
    def save_node_value(self, node_id, datavalue):
91 1
        data = self._datachanges[node_id]
92
        period, count = self._datachanges_period[node_id]
93 1
        data.append(datavalue)
94 1
        now = datetime.utcnow()
95 1
        if period:
96
            while now - data[0].ServerTimestamp > period:
97
                data.pop(0)
98
        if count and len(data) > count:
99 1
            data = data[-count:]
100
101 1
    def read_node_history(self, node_id, start, end, nb_values):
102
        cont = None
103 1
        if node_id not in self._datachanges:
104 1
            print("Error attempt to read history for a node which is not historized")
105 1
            return [], cont
106 1
        else:
107 1
            if end is None or ua.DateTimeMinValue:
108
                end = datetime.utcnow() + timedelta(days=1)
109 1
            if start is None:
110 1
                start = ua.DateTimeMinValue
111
            results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
112 1
            if nb_values:
113
                if start > ua.DateTimeMinValue and len(results) > nb_values:
114
                    cont = results[nb_values + 1].ServerTimestamp
115 1
                    results = results[:nb_values]
116
                else:
117
                    results = results[-nb_values:]
118
            return results, cont
119 1
120 1
    def new_historized_event(self, event, period):
121 1
        self._events = []
122
123 1
    def save_event(self, event):
124 1
        raise NotImplementedError
125
126
    def read_event_history(self, start, end, evfilter):
127 1
        raise NotImplementedError
128
129
    def stop(self):
130
        pass
131 1
132 1
133 1
class SubHandler(object):
134 1
    def __init__(self, storage):
135 1
        self.storage = storage
136 1
137
    def datachange_notification(self, node, val, data):
138 1
        self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
139
140
    def event_notification(self, event):
141 1
        self.storage.save_event(event)
142 1
143 1
144 1
class HistoryManager(object):
145 1
    def __init__(self, iserver):
146 1
        self.iserver = iserver
147 1
        self.storage = HistoryDict()
148 1
        self._sub = None
149 1
        self._handlers = {}
150
151 1
    def set_storage(self, storage):
152 1
        self.storage = storage
153 1
154 1
    def _create_subscription(self, handler):
155
        params = ua.CreateSubscriptionParameters()
156 1
        params.RequestedPublishingInterval = 10
157 1
        params.RequestedLifetimeCount = 3000
158 1
        params.RequestedMaxKeepAliveCount = 10000
159
        params.MaxNotificationsPerPublish = 0
160 1
        params.PublishingEnabled = True
161 1
        params.Priority = 0
162 1
        return Subscription(self.iserver.isession, params, handler)
163
164 1
    def historize(self, node, period=timedelta(days=7), count=0):
165
        if not self._sub:
166
            self._sub = self._create_subscription(SubHandler(self.storage))
167
        if node in self._handlers:
168
            raise ua.UaError("Node {} is already historized".format(node))
169
        self.storage.new_historized_node(node.nodeid, period, count)
170 1
        handler = self._sub.subscribe_data_change(node)
171
        self._handlers[node] = handler
172 1
173 1
    def dehistorize(self, node):
174 1
        self._sub.unsubscribe(self._handlers[node])
175 1
        del(self._handlers[node])
176
177 1
    def read_history(self, params):
178
        """
179
        Read history for a node
180 1
        This is the part AttributeService, but implemented as its own service
181 1
        since it requires more logic than other attribute service methods
182 1
        """
183
        results = []
184
        
185
        for rv in params.NodesToRead:
186 1
            res = self._read_history(params.HistoryReadDetails, rv)
187 1
            results.append(res)
188 1
        return results
189 1
        
190
    def _read_history(self, details, rv):
191
        """
192
        read history for a node
193
        """
194
        result = ua.HistoryReadResult()
195
        if isinstance(details, ua.ReadRawModifiedDetails):
196
            if details.IsReadModified:
197
                result.HistoryData = ua.HistoryModifiedData()
198
                # we do not support modified history by design so we return what we have
199
            else:
200
                result.HistoryData = ua.HistoryData()
201 1
            dv, cont = self._read_datavalue_history(rv, details)
202
            result.HistoryData.DataValues = dv
203 1
            result.ContinuationPoint = cont
204 1
205 1
        elif isinstance(details, ua.ReadEventDetails):
206
            result.HistoryData = ua.HistoryEvent()
207
            # FIXME: filter is a cumbersome type, maybe transform it something easier
208
            # to handle for storage
209
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
210
                                                                        details.EndTime,
211
                                                                        details.Filter)
212
        else:
213 1
            # we do not currently support the other types, clients can process data themselves
214
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
215
        return result
216
217 1
    def _read_datavalue_history(self, rv, details):
218
        starttime = details.StartTime
219 1
        if rv.ContinuationPoint:
220
            # Spec says we should ignore details if cont point is present
221
            # but they also say we can use cont point as timestamp to enable stateless
222
            # implementation. This is contradictory, so we assume details is
223 1
            # send correctly with continuation point
224
            #starttime = bytes_to_datetime(rv.ContinuationPoint)
225 1
            starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
226
227
        dv, cont = self.storage.read_node_history(rv.NodeId,
228
                                                  starttime,
229
                                                  details.EndTime,
230
                                                  details.NumValuesPerNode)
231
        if cont:
232
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
233
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
234
        # FIXME, parse index range and filter out if necessary
235
        # rv.IndexRange
236
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
237
        return dv, cont
238
239
    def update_history(self, params):
240
        """
241
        Update history for a node
242
        This is the part AttributeService, but implemented as its own service
243
        since it requires more logic than other attribute service methods
244
        """
245
        results = []
246
        for _ in params.HistoryUpdateDetails:
247
            result = ua.HistoryUpdateResult()
248
            # we do not accept to rewrite history
249
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
250
            results.append(results)
251
        return results
252
253
    def stop(self):
254
        self.storage.stop()
255