Completed
Pull Request — master (#204)
by Olivier
03:37
created

HistorySQLite.new_historized_node()   A

Complexity

Conditions 3

Size

Total Lines 23

Duplication

Lines 18
Ratio 78.26 %

Code Coverage

Tests 7
CRAP Score 3.0987

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 3
c 3
b 1
f 0
dl 18
loc 23
ccs 7
cts 9
cp 0.7778
crap 3.0987
rs 9.0856
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
import sqlite3
6 1
7 1
from opcua import ua
8 1
from opcua.common.utils import Buffer
9
from opcua.common import events
10
from opcua.server.history import HistoryStorageInterface
11 1
12
13
class HistorySQLite(HistoryStorageInterface):
14
    """
15
    history backend which stores data values and object events in a SQLite database
16
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
17
    the history database are in binary format (SQLite BLOBs)
18
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
19 1
    """
20 1
21 1
    def __init__(self, path="history.db"):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self._datachanges_period = {}
24 1
        self._db_file = path
25 View Code Duplication
        self._lock = Lock()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        self._event_fields = {}
27
28 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
29 1
30 1
    def new_historized_node(self, node_id, period, count=0):
31
        with self._lock:
32 1
            _c_new = self._conn.cursor()
33
34 1
            table = self._get_table_name(node_id)
35
36
            self._datachanges_period[node_id] = period, count
37
38 1
            # create a table for the node which will store attributes of the DataValue object
39 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
40
            try:
41
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
42
                               ' ServerTimestamp TIMESTAMP,'
43
                               ' SourceTimestamp TIMESTAMP,'
44
                               ' StatusCode INTEGER,'
45
                               ' Value TEXT,'
46
                               ' VariantType TEXT,'
47
                               ' VariantBinary BLOB)'.format(tn=table))
48
49
            except sqlite3.Error as e:
50 1
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
51
52 1
            self._conn.commit()
53 1
54 1
    def save_node_value(self, node_id, datavalue):
55
        with self._lock:
56 1
            _c_sub = self._conn.cursor()
57
58
            table = self._get_table_name(node_id)
59 1
60 1
            # insert the data change into the database
61
            try:
62
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
63
                               (
64
                                   datavalue.ServerTimestamp,
65
                                   datavalue.SourceTimestamp,
66
                                   datavalue.StatusCode.value,
67
                                   str(datavalue.Value.Value),
68
                                   datavalue.Value.VariantType.name,
69
                                   sqlite3.Binary(datavalue.Value.to_binary())
70
                               )
71
                              )
72
            except sqlite3.Error as e:
73 1
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
74
75
            self._conn.commit()
76 1
77
            # get this node's period from the period dict and calculate the limit
78 1
            period, count = self._datachanges_period[node_id]
79
80
            def execute_sql_delete(condition, args):
81
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
82
83
                try:
84
                    _c_sub.execute(query, args)
85
                except sqlite3.Error as e:
86
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87
88
                self._conn.commit()
89
90 1
            if period:
91 1
                # after the insert, if a period was specified delete all records older than period
92 1
                date_limit = datetime.utcnow() - period
93
                execute_sql_delete('ServerTimestamp < ?', (date_limit,))
94 1
95
            if count:
96 1
                # ensure that no more than count records are stored for the specified node
97 1
                execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
98 1
                                   'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
99
100 1
    def read_node_history(self, node_id, start, end, nb_values):
101 1
        with self._lock:
102
            _c_read = self._conn.cursor()
103 1
104 1
            table = self._get_table_name(node_id)
105 1
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
106
107 1
            cont = None
108 1
            results = []
109 1
110
            # select values from the database; recreate UA Variant from binary
111 1
            try:
112 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
113
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
114 1
                                           (start_time, end_time, limit,)):
115
116 1
                    # rebuild the data value object
117
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
118 1
                    dv.ServerTimestamp = row[1]
119 1
                    dv.SourceTimestamp = row[2]
120
                    dv.StatusCode = ua.StatusCode(row[3])
121
122 1
                    results.append(dv)
123 1
124
            except sqlite3.Error as e:
125
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
126
127
            if nb_values:
128 1
                if len(results) > nb_values:
129 1
                    cont = results[nb_values].ServerTimestamp
130 1
131 1
                results = results[:nb_values]
132
133 1
            return results, cont
134
135
    def new_historized_event(self, source_id, ev_fields, period, count=0):
136
        with self._lock:
137
            _c_new = self._conn.cursor()
138 1
139 1
            self._datachanges_period[source_id] = period
140 1
            self._event_fields[source_id] = ev_fields
141
142 1 View Code Duplication
            table = self._get_table_name(source_id)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
            columns = self._get_event_columns(ev_fields)
144 1
145
            # create a table for the event which will store fields generated by the source object's events
146 1
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
147 1
            # properties with these names
148 1
            try:
149
                _c_new.execute(
150 1
                    'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
151 1
                    .format(tn=table, co=columns))
152
153 1
            except sqlite3.Error as e:
154 1
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
155
156
            self._conn.commit()
157
158
    def save_event(self, event):
159 1
        with self._lock:
160 1
            _c_sub = self._conn.cursor()
161
162
            table = self._get_table_name(event.SourceNode)
163
            columns, placeholders, evtup = self._format_event(event)
164
            event_type = event.EventType  # useful for troubleshooting database
165
166
            # insert the event into the database
167
            try:
168 1
                _c_sub.execute(
169
                    'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
170 1
                    .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
171 1
172 1
            except sqlite3.Error as e:
173
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
174 1
175 1
            self._conn.commit()
176 1
177
            # get this node's period from the period dict and calculate the limit
178
            period = self._datachanges_period[event.SourceNode]
179 1
180 1
            if period:
181
                # after the insert, if a period was specified delete all records older than period
182
                date_limit = datetime.now() - period
183
184
                try:
185
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
186 1
                                   (date_limit.isoformat(' '),))
187
                except sqlite3.Error as e:
188
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
189 1
190
                self._conn.commit()
191 1
192
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
193
        with self._lock:
194
            _c_read = self._conn.cursor()
195
196
            table = self._get_table_name(source_id)
197
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
198
            clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
199
200
            cont = None
201
            cont_timestamps = []
202
            results = []
203 1
204 1
            # select events from the database; SQL select clause is built from EventFilter and available fields
205
            try:
206 1
                for row in _c_read.execute(
207
                        'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
208 1
                        .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
209
210 1
                    fdict = {}
211 1
                    cont_timestamps.append(row[0])
212 1
                    for i, field in enumerate(row[1:]):
213
                        if field is not None:
214 1
                            fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
215 1
                        else:
216
                            fdict[clauses[i]] = ua.Variant(None)
217 1
218 1
                    results.append(events.Event.from_field_dict(fdict))
219 1
220
            except sqlite3.Error as e:
221 1
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
222 1
223 1
            if nb_values:
224
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
225 1
                    cont = cont_timestamps[nb_values]
226 1
227
                results = results[:nb_values]
228 1
229
            return results, cont
230 1
231 1
    def _get_table_name(self, node_id):
232
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
233 1
234 1
    @staticmethod
235 1
    def _get_bounds(start, end, nb_values):
236
        order = "ASC"
237
238 1
        if start is None or start == ua.DateTimeMinValue:
239 1
            order = "DESC"
240
            start = ua.DateTimeMinValue
241
242
        if end is None or end == ua.DateTimeMinValue:
243
            end = datetime.utcnow() + timedelta(days=1)
244 1
245 1
        if start < end:
246 1
            start_time = start.isoformat(' ')
247
            end_time = end.isoformat(' ')
248 1
        else:
249 1
            order = "DESC"
250
            start_time = end.isoformat(' ')
251 1
            end_time = start.isoformat(' ')
252 1
253
        if nb_values:
254
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
255 1
        else:
256
            limit = -1  # in SQLite a LIMIT of -1 returns all results
257 1
258
        return start_time, end_time, order, limit
259
260
    def _format_event(self, event_result):
261
        placeholders = []
262 1
        ev_variant_binaries = []
263 1
264 1
        ev_variant_dict = event_result.get_event_props_as_fields_dict()
265
        names = list(ev_variant_dict.keys())
266 1
        names.sort()  # sort alphabetically since dict is not sorted
267
268 1
        # split dict into two synchronized lists which will be converted to SQL strings
269
        # note that the variants are converted to binary objects for storing in SQL BLOB format
270 1
        for name in names:
271 1
            variant = ev_variant_dict[name]
272
            placeholders.append('?')
273 1
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
274 1
275 1
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
276 1
277
    def _get_event_columns(self, ev_fields):
278 1
        fields = []
279
        for field in ev_fields:
280
            fields.append(field + ' BLOB')
281
        return self._list_to_sql_str(fields, False)
282 1
283 1
    def _get_select_clauses(self, source_id, evfilter):
284 1
        s_clauses = []
285 1
        for select_clause in evfilter.SelectClauses:
286
            try:
287 1
                if not select_clause.BrowsePath:
288
                    s_clauses.append(select_clause.Attribute.name)
289 1
                else:
290 1
                    name = select_clause.BrowsePath[0].Name
291 1
                    s_clauses.append(name)
292 1
            except AttributeError:
293 1
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
294
                                    ' Clause: %s:', source_id, select_clause)
295 1
296 1
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
297 1
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
298 1
        return clauses, self._list_to_sql_str(clauses)
299 1
300
    @staticmethod
301
    def _list_to_sql_str(ls, quotes=True):
302 1
        sql_str = ''
303 1
        for item in ls:
304
            if quotes:
305
                sql_str += '"' + item + '", '
306
            else:
307
                sql_str += item + ', '
308
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
309 1
310
    def stop(self):
311 1
        with self._lock:
312
            self._conn.close()
313
            self.logger.info('Historizing SQL connection closed')
314