Completed
Pull Request — master (#155)
by
unknown
02:27
created

opcua.server.HistorySQLite.new_historized_node()   B

Complexity

Conditions 3

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 3.0026
Metric Value
cc 3
dl 0
loc 26
ccs 14
cts 15
cp 0.9333
crap 3.0026
rs 8.8571
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6
7
import sqlite3
8 1
9
10
class HistoryStorageInterface(object):
11
12
    """
13
    Interface of a history backend.
14
    Must be implemented by backends
15 1
    """
16
17
    def new_historized_node(self, node, period, count=0):
18
        """
19
        Called when a new node is to be historized
20
        Returns None
21
        """
22 1
        raise NotImplementedError
23
24
    def save_node_value(self, node, datavalue):
25
        """
26
        Called when the value of a historized node has changed and should be saved in history
27
        Returns None
28
        """
29 1
        raise NotImplementedError
30
31
    def read_node_history(self, node, start, end, nb_values):
32
        """
33
        Called when a client make a history read request for a node
34
        if start or end is missing then nb_values is used to limit query
35
        nb_values is the max number of values to read. Ignored if 0
36
        Start time and end time are inclusive
37
        Returns a list of DataValues and a continuation point which
38
        is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
39
        """
40 1
        raise NotImplementedError
41
42
    def new_historized_event(self, event, period):
43
        """
44
        Called when historization of events is enabled on server side
45
        FIXME: we may need to store events per nodes in future...
46
        Returns None
47
        """
48 1
        raise NotImplementedError
49
50
    def save_event(self, event):
51
        """
52
        Called when a new event has been generated ans should be saved in history
53
        Returns None
54
        """
55 1
        raise NotImplementedError
56
57
    def read_event_history(self, start, end, evfilter):
58
        """
59
        Called when a client make a history read request for events
60
        Start time and end time are inclusive
61
        Returns a list of Events and a continuation point which
62
        is None if all events are read or the ServerTimeStamp of the last rejected event
63
        """
64
        raise NotImplementedError
65 1
66
    def stop(self):
67
        """
68
        Called when the server shuts down
69 1
        Can be used to close database connections etc.
70 1
        """
71 1
        raise NotImplementedError
72 1
73
74 1
class HistoryDict(HistoryStorageInterface):
75 1
    """
76 1
    very minimal history backend storing data in memory using a Python dictionnary
77
    """
78 1
    def __init__(self):
79
        self._datachanges = {}
80
        self._datachanges_period = {}
81 1
        self._events = {}
82 1
83 1
    def new_historized_node(self, node, period, count=0):
84 1
        node_id = node.nodeid
85 1
        self._datachanges[node_id] = []
86 1
        self._datachanges_period[node_id] = period, count
87 1
88
    def save_node_value(self, node, datavalue):
89 1
        node_id = node.nodeid
90 1
        data = self._datachanges[node_id]
91
        period, count = self._datachanges_period[node_id]
92 1
        data.append(datavalue)
93 1
        now = datetime.now()
94 1
        if period:
95
            while now - data[0].ServerTimestamp > period:
96
                data.pop(0)
97
        if count and len(data) > count:
98 1
            data = data[-count:]
99
100 1
    def read_node_history(self, node, start, end, nb_values):
101
        node_id = node.NodeId
102 1
        cont = None
103 1
        if node_id not in self._datachanges:
104 1
            print("Error attempt to read history for a node which is not historized")
105 1
            return [], cont
106 1
        else:
107
            if end is None:
108 1
                end = datetime.now() + timedelta(days=1)
109 1
            if start is None:
110
                start = ua.DateTimeMinValue
111 1
            results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
112
            if nb_values:
113
                if start > ua.DateTimeMinValue and len(results) > nb_values:
114 1
                    cont = results[nb_values + 1].ServerTimestamp
115
                    results = results[:nb_values]
116
                else:
117
                    results = results[-nb_values:]
118 1
            return results, cont
119 1
120 1
    def new_historized_event(self, event, period):
121
        self._events = []
122 1
123 1
    def save_event(self, event):
124
        raise NotImplementedError
125
126 1
    def read_event_history(self, start, end, evfilter):
127
        raise NotImplementedError
128
129
    def stop(self):
130 1
        pass
131 1
132 1
133 1
class HistorySQLite(HistoryStorageInterface):
134 1
    """
135 1
    very minimal history backend storing data in SQLite database
136
    """
137 1
    # FIXME: need to check on if sql_conn.commit() should be inside try block; and if .rollback() needs to be except
138
139
    def __init__(self):
140 1
        self._datachanges_period = {}
141 1
        self._events = {}
142 1
        self._db_file = "history.db"
143 1
144 1
        # SQL objects must be accessed in a single thread
145 1
        # adding a new node to be historized probably happens on the main thread
146 1
        self._conn_new = None
147 1
        self._c_new = None
148 1
149
        # subscriptions are in another thread so it needs it's own sqlite connection object
150 1
        self._conn_sub = None
151 1
        self._c_sub = None
152 1
153 1
        # FIXME: no idea what thread the read values happen in, just make a new conn object for now
154
        self._conn_read = None
155 1
        self._c_read = None
156 1
157 1
    def new_historized_node(self, node, period, count=0):
158
        if self._conn_new is None:
159 1
            self._conn_new = sqlite3.connect(self._db_file)
160 1
            self._c_new = self._conn_new.cursor()
161 1
162
        node_id = str(node.nodeid.NamespaceIndex) + '_' + str(node.nodeid.Identifier)
163 1
        self._datachanges_period[node_id] = period
164
165
        self._conn_new = sqlite3.connect(self._db_file)
166
        self._c_new = self._conn_new.cursor()
167
168
        sql_type = self._get_sql_type(node)
169 1
170
        # create a table for the node which will store attributes of the DataValue object
171 1
        try:
172 1
            self._c_new.execute('CREATE TABLE "{tn}" (ServerTimestamp TIMESTAMP,'
173 1
                               ' SourceTimestamp TIMESTAMP,'
174 1
                               ' StatusCode INTEGER,'
175
                               ' Value {type},'
176 1
                               ' VariantType INTEGER)'.format(tn=node_id, type=sql_type))
177
178
        except sqlite3.Error as e:
179 1
            print(node_id, 'Historizing SQL Table Creation Error:', e)
180 1
181 1
        self._conn_new.commit()
182
        self._conn_new.close()
183
184
    def save_node_value(self, node, datavalue):
185 1
        if self._conn_sub is None:
186 1
            self._conn_sub = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES)
187 1
            self._c_sub = self._conn_sub.cursor()
188 1
189
        node_id = str(node.nodeid.NamespaceIndex) + '_' + str(node.nodeid.Identifier)
190
191
        # insert the data change into the database
192
        try:
193
            self._c_sub.execute('INSERT INTO "{tn}" VALUES (?, ?, ?, ?, ?)'.format(tn=node_id), (datavalue.ServerTimestamp,
194
                                                                                                 datavalue.SourceTimestamp,
195
                                                                                                 datavalue.StatusCode.value,
196
                                                                                                 datavalue.Value.Value,
197
                                                                                                 datavalue.Value.VariantType.value))
198
        except sqlite3.Error as e:
199
            print(node_id, 'Historizing SQL Insert Error:', e)
200 1
201
        # get this node's period from the period dict and calculate the limit
202 1
        period = self._datachanges_period[node_id]
203 1
        date_limit = datetime.now() - period
204 1
205
        # after the insert, delete all values older than period
206
        try:
207
            self._c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=node_id), (date_limit.isoformat(' '),))
208
        except sqlite3.Error as e:
209
            print(node_id, 'Historizing SQL Delete Old Data Error:', e)
210
211
        self._conn_sub.commit()
212 1
213
    def read_node_history(self, node, start, end, nb_values):
214
        if self._conn_read is None:
215
            self._conn_read = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES)
216 1
            self._c_read = self._conn_read.cursor()
217
218 1
        if end is None:
219
            end = datetime.now() + timedelta(days=1)
220
        if start is None:
221
            start = ua.DateTimeMinValue
222 1
223
        node_id = str(node.NodeId.NamespaceIndex) + '_' + str(node.NodeId.Identifier)
224 1
        cont = None
225
        results = []
226
227
        start_time = start.isoformat(' ')
228
        end_time = end.isoformat(' ')
229
230
        # select values from the database
231
        try:
232
            for row in self._c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
233
                                           'LIMIT ?'.format(tn=node_id), (start_time, end_time, nb_values,)):
234
235
                dv = ua.DataValue(ua.Variant(row[3], self._get_variant_type(row[4])))
236
                dv.ServerTimestamp = row[0]
237
                dv.SourceTimestamp = row[1]
238
                dv.StatusCode = ua.StatusCode(row[2])
239
240
                results.append(dv)
241
242
        except sqlite3.Error as e:
243
            print(node_id, 'Historizing SQL Read Error:', e)
244
245
        return results, cont
246
247
    def new_historized_event(self, event, period):
248
        raise NotImplementedError
249
250
    def save_event(self, event):
251
        raise NotImplementedError
252
253
    def read_event_history(self, start, end, evfilter):
254
        raise NotImplementedError
255
256
    # convert the node UA Variant type to an SQL supported type
257
    # FIXME: this could lead to lost precision! Better way to store the data? Add custom types to SQL?
258
    def _get_sql_type(self, node):
259
        node_type = node.get_data_type()
260
        # see object_ids.py
261
        if node_type.Identifier in (10, 11,):  # Float, Double
262
            return 'REAL'
263
        elif node_type.Identifier in (4, 5, 6, 7, 8, 9,):  # Ints
264
            return "INT"
265
        elif node_type.Identifier in (12,):  # String
266
            return "TEXT"
267
        else:
268
            return "NULL"
269
270
    # convert the OPC UA variant identifier stored in SQL back to a UA Variant
271
    # FIXME: is there a util method someplace for getting this?
272
    def _get_variant_type(self, identifier):
273
        if identifier is 10:
274
            return ua.VariantType.Float
275
        elif identifier is 11:
276
            return ua.VariantType.Double
277
        elif identifier is 4:
278
            return ua.VariantType.Int16
279
        elif identifier is 5:
280
            return ua.VariantType.UInt16
281
        elif identifier is 6:
282
            return ua.VariantType.Int32
283
        elif identifier is 7:
284
            return ua.VariantType.UInt32
285
        elif identifier is 8:
286
            return ua.VariantType.Int64
287
        elif identifier is 9:
288
            return ua.VariantType.UInt64
289
        elif identifier is 12:
290
            return ua.VariantType.String
291
292
    # close connections to the history database when the server stops
293
    def stop(self):
294
        pass
295
        # FIXME: Should close the database connections when the server stops, but because server.stop() is called
296
        # FIXME: on a different thread than the SQL conn object, no idea how to do this at the moment
297
298
299
class SubHandler(object):
300
    def __init__(self, storage):
301
        self.storage = storage
302
303
    def datachange_notification(self, node, val, data):
304
        self.storage.save_node_value(node, data.monitored_item.Value)  # SHOULD NOT GET NODE ID HERE
305
306
    def event_notification(self, event):
307
        self.storage.save_event(event)
308
309
310
class HistoryManager(object):
311
    def __init__(self, iserver):
312
        self.iserver = iserver
313
        self.storage = HistorySQLite()  # HistoryDict() HistorySQLite()
314
        self._sub = None
315
        self._handlers = {}
316
317
    def set_storage(self, storage):
318
        self.storage = storage
319
320
    def _create_subscription(self, handler):
321
        params = ua.CreateSubscriptionParameters()
322
        params.RequestedPublishingInterval = 10
323
        params.RequestedLifetimeCount = 3000
324
        params.RequestedMaxKeepAliveCount = 10000
325
        params.MaxNotificationsPerPublish = 0
326
        params.PublishingEnabled = True
327
        params.Priority = 0
328
        return Subscription(self.iserver.isession, params, handler)
329
330
    def historize(self, node, period=timedelta(days=7), count=0):
331
        if not self._sub:
332
            self._sub = self._create_subscription(SubHandler(self.storage))
333
        if node in self._handlers:
334
            raise ua.UaError("Node {} is allready historized".format(node))
335
        self.storage.new_historized_node(node, period, count)  # SHOULD NOT GET NODE ID HERE
336
        handler = self._sub.subscribe_data_change(node)
337
        self._handlers[node] = handler
338
339
    def dehistorize(self, node):
340
        self._sub.unsubscribe(self._handlers[node])
341
        del(self._handlers[node])
342
343
    def read_history(self, params):
344
        """
345
        Read history for a node
346
        This is the part AttributeService, but implemented as its own service
347
        since it requires more logic than other attribute service methods
348
        """
349
        results = []
350
        
351
        for rv in params.NodesToRead:
352
            res = self._read_history(params.HistoryReadDetails, rv)
353
            results.append(res)
354
        return results
355
        
356
    def _read_history(self, details, rv):
357
        """
358
        read history for a node
359
        """
360
        result = ua.HistoryReadResult()
361
        if isinstance(details, ua.ReadRawModifiedDetails):
362
            if details.IsReadModified:
363
                result.HistoryData = ua.HistoryModifiedData()
364
                # we do not support modified history by design so we return what we have
365
            else:
366
                result.HistoryData = ua.HistoryData()
367
            dv, cont = self._read_datavalue_history(rv, details)
368
            result.HistoryData.DataValues = dv
369
            result.ContinuationPoint = cont
370
371
        elif isinstance(details, ua.ReadEventDetails):
372
            result.HistoryData = ua.HistoryEvent()
373
            # FIXME: filter is a cumbersome type, maybe transform it something easier
374
            # to handle for storage
375
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
376
                                                                        details.EndTime,
377
                                                                        details.Filter)
378
        else:
379
            # we do not currently support the other types, clients can process data themselves
380
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
381
        return result
382
383
    def _read_datavalue_history(self, rv, details):
384
        starttime = details.StartTime
385
        if rv.ContinuationPoint:
386
            # Spec says we should ignore details if cont point is present
387
            # but they also say we can use cont point as timestamp to enable stateless
388
            # implementation. This is contradictory, so we assume details is
389
            # send correctly with continuation point
390
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
391
            starttime = ua.unpack_datetime(rv.ContinuationPoint)
392
393
        dv, cont = self.storage.read_node_history(rv,  # SHOULD NOT GET NODE ID HERE!
394
                                                  starttime,
395
                                                  details.EndTime,
396
                                                  details.NumValuesPerNode)
397
        if cont:
398
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
399
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
400
        # FIXME, parse index range and filter out if necessary
401
        # rv.IndexRange
402
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
403
        return dv, cont
404
405
    def update_history(self, params):
406
        """
407
        Update history for a node
408
        This is the part AttributeService, but implemented as its own service
409
        since it requires more logic than other attribute service methods
410
        """
411
        results = []
412
        for _ in params.HistoryUpdateDetails:
413
            result = ua.HistoryUpdateResult()
414
            # we do not accept to rewrite history
415
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
416
            results.append(results)
417
        return results
418
419
    def stop(self):
420
        self.storage.stop()
421