Completed
Push — master ( 175840...68f055 )
by Olivier
03:53 queued 27s
created

HistorySQLite._get_select_clauses()   B

Complexity

Conditions 6

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 6.972

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 16
ccs 7
cts 10
cp 0.7
crap 6.972
rs 8
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
import sqlite3
6 1
7 1
from opcua import ua
8 1
from opcua.common.utils import Buffer
9
from opcua.common import events
10
from opcua.server.history import HistoryStorageInterface
11 1
12
13
class HistorySQLite(HistoryStorageInterface):
14
    """
15
    history backend which stores data values and object events in a SQLite database
16
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
17
    the history database are in binary format (SQLite BLOBs)
18
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
19 1
    """
20 1
21 1
    def __init__(self, path="history.db"):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self._datachanges_period = {}
24 1
        self._db_file = path
25 View Code Duplication
        self._lock = Lock()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        self._event_fields = {}
27
28 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
29 1
30 1
    def new_historized_node(self, node_id, period, count=0):
31
        with self._lock:
32 1
            _c_new = self._conn.cursor()
33
34 1
            table = self._get_table_name(node_id)
35
36
            self._datachanges_period[node_id] = period, count
37
38 1
            # create a table for the node which will store attributes of the DataValue object
39 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
40
            try:
41
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
42
                               ' ServerTimestamp TIMESTAMP,'
43
                               ' SourceTimestamp TIMESTAMP,'
44
                               ' StatusCode INTEGER,'
45
                               ' Value TEXT,'
46
                               ' VariantType TEXT,'
47
                               ' VariantBinary BLOB)'.format(tn=table))
48
49
            except sqlite3.Error as e:
50 1
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
51
52 1
            self._conn.commit()
53 1
54 1
    def save_node_value(self, node_id, datavalue):
55
        with self._lock:
56 1
            _c_sub = self._conn.cursor()
57
58
            table = self._get_table_name(node_id)
59 1
60 1
            # insert the data change into the database
61
            try:
62
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
63
                               (
64
                                   datavalue.ServerTimestamp,
65
                                   datavalue.SourceTimestamp,
66
                                   datavalue.StatusCode.value,
67
                                   str(datavalue.Value.Value),
68
                                   datavalue.Value.VariantType.name,
69
                                   sqlite3.Binary(datavalue.Value.to_binary())
70
                               )
71
                              )
72
            except sqlite3.Error as e:
73 1
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
74
75
            self._conn.commit()
76 1
77
            # get this node's period from the period dict and calculate the limit
78 1
            period, count = self._datachanges_period[node_id]
79
80
            def executeDeleteStatement(condition, args):
81
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
82
83
                try:
84
                    _c_sub.execute(query, args)
85
                except sqlite3.Error as e:
86
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87
88
                self._conn.commit()
89
90 1
            if period:
91 1
                # after the insert, if a period was specified delete all records older than period
92 1
                date_limit = datetime.utcnow() - period
93
                executeDeleteStatement('ServerTimestamp < ?', (date_limit,))
94 1
95
            if count:
96 1
                #ensure that no more than count records are stored for the specified node
97 1
                executeDeleteStatement('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
98 1
99
    def read_node_history(self, node_id, start, end, nb_values):
100 1
        with self._lock:
101 1
            _c_read = self._conn.cursor()
102
103 1
            order = "ASC"
104 1
105 1
            if start is None or start == ua.DateTimeMinValue:
106
                order = "DESC"
107 1
                start = ua.DateTimeMinValue
108 1
109 1
            if end is None or end == ua.DateTimeMinValue:
110
                end = datetime.utcnow() + timedelta(days=1)
111 1
112 1
            if start < end:
113
                start_time = start.isoformat(' ')
114 1
                end_time = end.isoformat(' ')
115
            else:
116 1
                order = "DESC"
117
                start_time = end.isoformat(' ')
118 1
                end_time = start.isoformat(' ')
119 1
120
            if nb_values:
121
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
122 1
            else:
123 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
124
125
            table = self._get_table_name(node_id)
126
127
            cont = None
128 1
            results = []
129 1
130 1
            # select values from the database; recreate UA Variant from binary
131 1
            try:
132
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
133 1
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
134
                                           (start_time, end_time, limit,)):
135
136
                    # rebuild the data value object
137
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
138 1
                    dv.ServerTimestamp = row[1]
139 1
                    dv.SourceTimestamp = row[2]
140 1
                    dv.StatusCode = ua.StatusCode(row[3])
141
142 1 View Code Duplication
                    results.append(dv)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
144 1
            except sqlite3.Error as e:
145
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
146 1
147 1
            if nb_values:
148 1
                if len(results) > nb_values:
149
                    cont = results[nb_values].ServerTimestamp
150 1
151 1
                results = results[:nb_values]
152
153 1
            return results, cont
154 1
155
    def new_historized_event(self, source_id, ev_fields, period, count):
156
        with self._lock:
157
            _c_new = self._conn.cursor()
158
159 1
            self._datachanges_period[source_id] = period
160 1
            self._event_fields[source_id] = ev_fields
161
162
            table = self._get_table_name(source_id)
163
            columns = self._get_event_columns(ev_fields)
164
165
            # create a table for the event which will store fields generated by the source object's events
166
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
167
            # properties with these names
168 1
            try:
169
                _c_new.execute(
170 1
                    'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
171 1
                    .format(tn=table, co=columns))
172 1
173
            except sqlite3.Error as e:
174 1
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
175 1
176 1
            self._conn.commit()
177
178
    def save_event(self, event):
179 1
        with self._lock:
180 1
            _c_sub = self._conn.cursor()
181
182
            table = self._get_table_name(event.SourceNode)
183
            columns, placeholders, evtup = self._format_event(event)
184
            event_type = event.EventType  # useful for troubleshooting database
185
186 1
            # insert the event into the database
187
            try:
188
                _c_sub.execute(
189 1
                    'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
190
                    .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
191 1
192
            except sqlite3.Error as e:
193
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
194
195
            self._conn.commit()
196
197
            # get this node's period from the period dict and calculate the limit
198
            period = self._datachanges_period[event.SourceNode]
199
200
            if period:
201
                # after the insert, if a period was specified delete all records older than period
202
                date_limit = datetime.now() - period
203 1
204 1
                try:
205
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
206 1
                                   (date_limit.isoformat(' '),))
207
                except sqlite3.Error as e:
208 1
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
209
210 1
                self._conn.commit()
211 1
212 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
213
        with self._lock:
214 1
215 1
            _c_read = self._conn.cursor()
216
217 1
            order = "ASC"
218 1
219 1
            if start is None or start == ua.DateTimeMinValue:
220
                order = "DESC"
221 1
                start = ua.DateTimeMinValue
222 1
223 1
            if end is None or end == ua.DateTimeMinValue:
224
                end = datetime.utcnow() + timedelta(days=1)
225 1
226 1
            if start < end:
227
                start_time = start.isoformat(' ')
228 1
                end_time = end.isoformat(' ')
229
            else:
230 1
                order = "DESC"
231 1
                start_time = end.isoformat(' ')
232
                end_time = start.isoformat(' ')
233 1
234 1
            if nb_values:
235 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
236
            else:
237
                limit = -1  # in SQLite a LIMIT of -1 returns all results
238 1
239 1
            table = self._get_table_name(source_id)
240
            clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
241
242
            cont = None
243
            cont_timestamps = []
244 1
            results = []
245 1
246 1
            # select events from the database; SQL select clause is built from EventFilter and available fields
247
            try:
248 1
                for row in _c_read.execute(
249 1
                        'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
250
                        .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
251 1
252 1
                    fdict = {}
253
                    cont_timestamps.append(row[0])
254
                    for i, field in enumerate(row[1:]):
255 1
                        if field is not None:
256
                            fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
257 1
                        else:
258
                            fdict[clauses[i]] = ua.Variant(None)
259
260
                    results.append(events.EventResult.from_field_dict(fdict))
261
262 1
            except sqlite3.Error as e:
263 1
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
264 1
265
            if nb_values:
266 1
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
267
                    cont = cont_timestamps[nb_values]
268 1
269
                results = results[:nb_values]
270 1
271 1
            return results, cont
272
273 1
    def _get_table_name(self, node_id):
274 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
275 1
276 1
    def _format_event(self, event_result):
277
        placeholders = []
278 1
        ev_variant_binaries = []
279
280
        ev_variant_dict = event_result.get_event_props_as_fields_dict()
281
        names = list(ev_variant_dict.keys())
282 1
        names.sort()  # sort alphabatically since dict is not sorted
283 1
284 1
        # split dict into two synchronized lists which will be converted to SQL strings
285 1
        # note that the variants are converted to binary objects for storing in SQL BLOB format
286
        for name in names:
287 1
            variant = ev_variant_dict[name]
288
            placeholders.append('?')
289 1
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
290 1
291 1
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
292 1
293 1
    def _get_event_columns(self, ev_fields):
294
        fields = []
295 1
        for field in ev_fields:
296 1
            fields.append(field + ' BLOB')
297 1
        return self._list_to_sql_str(fields, False)
298 1
299 1
    def _get_select_clauses(self, source_id, evfilter):
300
        s_clauses = []
301
        for select_clause in evfilter.SelectClauses:
302 1
            try:
303 1
                if not select_clause.BrowsePath:
304
                    s_clauses.append(select_clause.Attribute.name)
305
                else:
306
                    name = select_clause.BrowsePath[0].Name
307
                    s_clauses.append(name)
308
            except AttributeError:
309 1
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
310
                                    ' Clause: %s:', source_id, select_clause)
311 1
312
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
313 1
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
314 1
        return clauses, self._list_to_sql_str(clauses)
315 1
316
    def _list_to_sql_str(self, ls, quotes=True):
317
        sql_str = ''
318
        for item in ls:
319 1
            if quotes:
320 1
                sql_str += '"' + item + '", '
321 1
            else:
322 1
                sql_str += item + ', '
323 1
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
324
325 1
    def stop(self):
326 1
        with self._lock:
327
            self._conn.close()
328
            self.logger.info('Historizing SQL connection closed')
329