Completed
Pull Request — master (#189)
by
unknown
03:24
created

HistorySQLite._get_select_clauses()   B

Complexity

Conditions 6

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 6.0163

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 17
ccs 12
cts 13
cp 0.9231
crap 6.0163
rs 8
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    history backend which stores data values and object events in a SQLite database
14
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
15
    the history database are in binary format (SQLite BLOBs)
16
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
17
    """
18
19 1
    def __init__(self, path="history.db"):
20 1
        self.logger = logging.getLogger(__name__)
21 1
        self._datachanges_period = {}
22 1
        self._db_file = path
23 1
        self._lock = Lock()
24 1
        self._event_fields = {}
25 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
27
28 1
    def new_historized_node(self, node_id, period, count=0):
29 1
        with self._lock:
30 1
            _c_new = self._conn.cursor()
31
32 1
            table = self._get_table_name(node_id)
33
34 1
            self._datachanges_period[node_id] = period, count
35
36
            # create a table for the node which will store attributes of the DataValue object
37
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
38 1
            try:
39 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
40
                               ' ServerTimestamp TIMESTAMP,'
41
                               ' SourceTimestamp TIMESTAMP,'
42
                               ' StatusCode INTEGER,'
43
                               ' Value TEXT,'
44
                               ' VariantType TEXT,'
45
                               ' VariantBinary BLOB)'.format(tn=table))
46
47
            except sqlite3.Error as e:
48
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
49
50 1
            self._conn.commit()
51
52 1
    def save_node_value(self, node_id, datavalue):
53 1
        with self._lock:
54 1
            _c_sub = self._conn.cursor()
55
56 1
            table = self._get_table_name(node_id)
57
58
            # insert the data change into the database
59 1
            try:
60 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
61
                               (
62
                                   datavalue.ServerTimestamp,
63
                                   datavalue.SourceTimestamp,
64
                                   datavalue.StatusCode.value,
65
                                   str(datavalue.Value.Value),
66
                                   datavalue.Value.VariantType.name,
67
                                   sqlite3.Binary(datavalue.Value.to_binary())
68
                               )
69
                               )
70
            except sqlite3.Error as e:
71
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
72
73 1
            self._conn.commit()
74
75
            # get this node's period from the period dict and calculate the limit
76 1
            period, count = self._datachanges_period[node_id]
77
78 1
            def executeDeleteStatement(condition, args):
79
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
80
81
                try:
82
                    _c_sub.execute(query, args)
83
                except sqlite3.Error as e:
84
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
85
86
                self._conn.commit()
87
88
            if period:
89
                # after the insert, if a period was specified delete all records older than period
90 1
                date_limit = datetime.utcnow() - period
91 1
                executeDeleteStatement('ServerTimestamp < ?', (date_limit,))
92 1
93
            if count:
94 1
                # ensure that no more than count records are stored for the specified node
95
                executeDeleteStatement('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
96 1
97 1
    def read_node_history(self, node_id, start, end, nb_values):
98 1
        with self._lock:
99
            _c_read = self._conn.cursor()
100 1
101 1
            table = self._get_table_name(node_id)
102
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
103 1
104 1
            cont = None
105 1
            results = []
106
107 1
            # select values from the database; recreate UA Variant from binary
108 1
            try:
109 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
110
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
111 1
                                           (start_time, end_time, limit,)):
112 1
113
                    # rebuild the data value object
114 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
115
                    dv.ServerTimestamp = row[1]
116 1
                    dv.SourceTimestamp = row[2]
117
                    dv.StatusCode = ua.StatusCode(row[3])
118 1
119 1
                    results.append(dv)
120
121
            except sqlite3.Error as e:
122 1
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
123 1
124
            if nb_values:
125
                if len(results) > nb_values:
126
                    cont = results[nb_values].ServerTimestamp
127
128 1
                results = results[:nb_values]
129 1
130 1
            return results, cont
131 1
132
    def new_historized_event(self, source_id, ev_fields, period):
133 1
        with self._lock:
134
            _c_new = self._conn.cursor()
135
136
            self._datachanges_period[source_id] = period
137
            self._event_fields[source_id] = ev_fields
138 1
139 1
            table = self._get_table_name(source_id)
140 1
            columns = self._get_event_columns(ev_fields)
141
142 1 View Code Duplication
            # create a table for the event which will store fields generated by the source object's events
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
144 1
            # properties with these names
145
            try:
146 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
147 1
                               '_Timestamp TIMESTAMP, '
148 1
                               '_EventTypeName TEXT, '
149
                               '{co})'.format(tn=table, co=columns))
150 1
151 1
            except sqlite3.Error as e:
152
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
153 1
154 1
            self._conn.commit()
155
156
    def save_event(self, event):
157
        with self._lock:
158
            _c_sub = self._conn.cursor()
159 1
160 1
            table = self._get_table_name(event.SourceNode)
161
            columns, placeholders, evtup = self._format_event(event)
162
            event_type = event.EventType  # useful for troubleshooting database
163
164
            # insert the event into the database
165
            try:
166
                _c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
167
                               'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
168 1
169
            except sqlite3.Error as e:
170 1
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
171 1
172 1
            self._conn.commit()
173
174 1
            # get this node's period from the period dict and calculate the limit
175 1
            period = self._datachanges_period[event.SourceNode]
176 1
177
            if period:
178
                # after the insert, if a period was specified delete all records older than period
179 1
                date_limit = datetime.now() - period
180 1
181
                try:
182
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
183
                                   (date_limit.isoformat(' '),))
184
                except sqlite3.Error as e:
185
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
186 1
187
                self._conn.commit()
188
189 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
190
        with self._lock:
191 1
            _c_read = self._conn.cursor()
192
193
            table = self._get_table_name(source_id)
194
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
195
            clauses = self._get_select_clauses(source_id, evfilter)
196
197
            cont = None
198
            cont_timestamps = []
199
            results = []
200
201
            # select events from the database; SQL select clause is built from EventFilter and available fields
202
            try:
203 1
                for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
204 1
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
205
                                           (start_time, end_time, limit,)):
206 1
207
                    # place all the variants in the event field list object
208 1
                    hist_ev_field_list = ua.HistoryEventFieldList()
209
                    i = 0
210 1
                    for field in row:
211 1
                        # if the field is the _Timestamp column store it in a list used for getting the continuation
212 1
                        if i == 0:
213
                            cont_timestamps.append(field)
214 1
                        else:
215 1
                            if field is not None:
216
                                hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
217 1
                            else:
218 1
                                hist_ev_field_list.EventFields.append(ua.Variant(None))
219 1
                        i += 1
220
221 1
                    results.append(hist_ev_field_list)
222 1
223 1
            except sqlite3.Error as e:
224
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
225 1
226 1
            if nb_values:
227
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
228 1
                    cont = cont_timestamps[nb_values]
229
230 1
                results = results[:nb_values]
231 1
232
            return results, cont
233 1
234 1
    @staticmethod
235 1
    def _get_table_name(node_id):
236
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
237
238 1
    @staticmethod
239 1
    def _get_bounds(start, end, nb_values):
240
        order = "ASC"
241
242
        if start is None or start == ua.DateTimeMinValue:
243
            order = "DESC"
244 1
            start = ua.DateTimeMinValue
245 1
246 1
        if end is None or end == ua.DateTimeMinValue:
247
            end = datetime.utcnow() + timedelta(days=1)
248 1
249 1
        if start < end:
250
            start_time = start.isoformat(' ')
251 1
            end_time = end.isoformat(' ')
252 1
        else:
253
            order = "DESC"
254
            start_time = end.isoformat(' ')
255 1
            end_time = start.isoformat(' ')
256
257 1
        if nb_values:
258
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
259
        else:
260
            limit = -1  # in SQLite a LIMIT of -1 returns all results
261
262 1
        return start_time, end_time, order, limit
263 1
264 1
    def _format_event(self, event_result):
265
        placeholders = []
266 1
        ev_fields = []
267
        ev_variant_binaries = []
268 1
269
        ev_variant_dict = event_result.get_event_props_as_fields_dict()
270 1
271 1
        # split dict into two synchronized lists which will be converted to SQL strings
272
        # note that the variants are converted to binary objects for storing in SQL BLOB format
273 1
        for field, variant in ev_variant_dict.items():
274 1
            placeholders.append('?')
275 1
            ev_fields.append(field)
276 1
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
277
278 1
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
279
280
    def _get_event_columns(self, ev_fields):
281
        fields = []
282 1
        for field in ev_fields:
283 1
                fields.append(field + ' BLOB')
284 1
        return self._list_to_sql_str(fields, False)
285 1
286
    def _get_select_clauses(self, source_id, evfilter):
287 1
        s_clauses = []
288
        for select_clause in evfilter.SelectClauses:
289 1
            try:
290 1
                if not select_clause.BrowsePath:
291 1
                    s_clauses.append(select_clause.Attribute.name)
292 1
                else:
293 1
                    name = select_clause.BrowsePath[0].Name
294
                    s_clauses.append(name)
295 1
            except AttributeError:
296 1
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
297 1
                                    ' Clause: %s:', source_id, select_clause)
298 1
299 1
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
300
        clauses = [x for x in s_clauses if self._check(source_id, x)]
301
302 1
        return self._list_to_sql_str(clauses)
303 1
304
    def _check(self, source_id, s_clause):
305
        if s_clause in self._event_fields[source_id]:
306
            return True
307
        else:
308
            return False
309 1
310
    @staticmethod
311 1
    def _list_to_sql_str(ls, quotes=True):
312
        sql_str = ''
313 1
        for item in ls:
314 1
            if quotes:
315 1
                sql_str += '"' + item + '", '
316
            else:
317
                sql_str += item + ', '
318
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
319 1
320 1
    def stop(self):
321 1
        with self._lock:
322 1
            self._conn.close()
323
            self.logger.info('Historizing SQL connection closed')
324