Completed
Pull Request — master (#156)
by
unknown
02:37
created

opcua.server.HistorySQLite.read_node_history()   B

Complexity

Conditions 5

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 34
rs 8.0894
1
from datetime import timedelta
2
from datetime import datetime
3
4
from opcua import ua, Node
5
from opcua.server.history import HistoryStorageInterface
6
7
import struct
8
import sqlite3
9
10
11
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
    # FIXME: need to check on if sql_conn.commit() should be inside try block; and if .rollback() needs to be except
16
17
    def __init__(self):
18
        self._datachanges_period = {}
19
        self._events = {}
20
        self._db_file = "history.db"
21
22
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
23
24
    def new_historized_node(self, node, period, count=0):
25
        _c_new = self._conn.cursor()
26
27
        node_id = self._get_table_name(node)
28
29
        self._datachanges_period[node_id] = period
30
31
        # create a table for the node which will store attributes of the DataValue object
32
        try:
33
            _c_new.execute('CREATE TABLE "{tn}" (ServerTimestamp TIMESTAMP,'
34
                           ' SourceTimestamp TIMESTAMP,'
35
                           ' StatusCode INTEGER,'
36
                           ' Value TEXT,'
37
                           ' VariantType INTEGER,'
38
                           ' ValueBinary BLOB)'.format(tn=node_id))
39
40
        except sqlite3.Error as e:
41
            print(node_id, 'Historizing SQL Table Creation Error:', e)
42
43
        self._conn.commit()
44
45
    def save_node_value(self, node, datavalue):
46
        _c_sub = self._conn.cursor()
47
48
        node_id = self._get_table_name(node)
49
50
        value_blob = self._pack_value(datavalue)
51
52
        # insert the data change into the database
53
        try:
54
            _c_sub.execute('INSERT INTO "{tn}" VALUES (?, ?, ?, ?, ?, ?)'.format(tn=node_id), (datavalue.ServerTimestamp,
55
                                                                                               datavalue.SourceTimestamp,
56
                                                                                               datavalue.StatusCode.value,
57
                                                                                               str(datavalue.Value.Value),
58
                                                                                               datavalue.Value.VariantType.value,
59
                                                                                               value_blob))
60
        except sqlite3.Error as e:
61
            print(node_id, 'Historizing SQL Insert Error:', e)
62
63
        # get this node's period from the period dict and calculate the limit
64
        period = self._datachanges_period[node_id]
65
        date_limit = datetime.now() - period
66
67
        # after the insert, delete all values older than period
68
        try:
69
            _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=node_id),
70
                                                                                (date_limit.isoformat(' '),))
71
        except sqlite3.Error as e:
72
            print(node_id, 'Historizing SQL Delete Old Data Error:', e)
73
74
        self._conn.commit()
75
76
    def read_node_history(self, node, start, end, nb_values):
77
        _c_read = self._conn.cursor()
78
79
        if end is None:
80
            end = datetime.now() + timedelta(days=1)
81
        if start is None:
82
            start = ua.DateTimeMinValue
83
84
        node_id = self._get_table_name(node)
85
86
        cont = None
87
        results = []
88
89
        start_time = start.isoformat(' ')
90
        end_time = end.isoformat(' ')
91
92
        # select values from the database
93
        try:
94
            for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
95
                                       'LIMIT ?'.format(tn=node_id), (start_time, end_time, nb_values,)):
96
97
                value = self._unpack_value(row[4], row[5])
98
99
                dv = ua.DataValue(ua.Variant(value, ua.VariantType(row[4])))
100
                dv.ServerTimestamp = row[0]
101
                dv.SourceTimestamp = row[1]
102
                dv.StatusCode = ua.StatusCode(row[2])
103
104
                results.append(dv)
105
106
        except sqlite3.Error as e:
107
            print(node_id, 'Historizing SQL Read Error:', e)
108
109
        return results, cont
110
111
    def new_historized_event(self, event, period):
112
        raise NotImplementedError
113
114
    def save_event(self, event):
115
        raise NotImplementedError
116
117
    def read_event_history(self, start, end, evfilter):
118
        raise NotImplementedError
119
120
    def _pack_value(self, datavalue):
121
        variant_code = datavalue.Value.VariantType.value
122
        return struct.pack(self._get_pack_type(variant_code), datavalue.Value.Value)
123
124
    def _unpack_value(self, variant_code, binary,):
125
        return struct.unpack(self._get_pack_type(variant_code), binary)
126
127
    def _get_pack_type(self, variant_code):
128
        # see object_ids.py
129
        if variant_code is 1:  # Bool
130
            return '?'
131
        elif variant_code is 2:  # Char (string with length of one in python)
132
            return 'c'
133
        elif variant_code is 3:  # Byte (Signed Char in python)
134
            return 'b'
135
        if variant_code is 4:  # Int16
136
            return 'h'
137
        elif variant_code is 5:  # UInt16
138
            return 'H'
139
        elif variant_code is 6:  # Int32
140
            return 'i'
141
        elif variant_code is 7:  # UInt32
142
            return 'I'
143
        elif variant_code is 8:  # Int64
144
            return 'q'
145
        elif variant_code is 9:  # UInt64
146
            return 'Q'
147
        elif variant_code is 10:  # Float
148
            return 'f'
149
        elif variant_code is 11:  # Double
150
            return 'd'
151
        elif variant_code in (12,):  # String
152
            return "s"
153
        else:
154
            # FIXME: Should raise exception here that historizing of node type isn't supported in SQL storage interface
155
            return None
156
157
    def _get_table_name(self, node):
158
        if type(node) is Node:
159
            return str(node.nodeid.NamespaceIndex) + '_' + str(node.nodeid.Identifier)
160
        if type(node) is ua.HistoryReadValueId:
161
            return str(node.NodeId.NamespaceIndex) + '_' + str(node.NodeId.Identifier)
162
163
    # close connections to the history database when the server stops
164
    def stop(self):
165
        self._conn.close()
166