Completed
Pull Request — master (#180)
by
unknown
03:21
created

HistorySQLite.__init__()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 2
Ratio 25 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 1
c 3
b 1
f 0
dl 2
loc 8
ccs 7
cts 7
cp 1
crap 1
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    history backend which stores data values and object events in a SQLite database
14
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
15
    the history database are in binary format (SQLite BLOBs)
16
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
17
    """
18
19 1
    def __init__(self, path="history.db"):
20 1
        self.logger = logging.getLogger(__name__)
21 1
        self._datachanges_period = {}
22 1
        self._db_file = path
23 1
        self._lock = Lock()
24 1
        self._event_fields = {}
25 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
27
28 1
    def new_historized_node(self, node_id, period, count=0):
29 1
        with self._lock:
30 1
            _c_new = self._conn.cursor()
31
32 1
            table = self._get_table_name(node_id)
33
34 1
            self._datachanges_period[node_id] = period, count
35
36
            # create a table for the node which will store attributes of the DataValue object
37
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
38 1
            try:
39 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
40
                               ' ServerTimestamp TIMESTAMP,'
41
                               ' SourceTimestamp TIMESTAMP,'
42
                               ' StatusCode INTEGER,'
43
                               ' Value TEXT,'
44
                               ' VariantType TEXT,'
45
                               ' VariantBinary BLOB)'.format(tn=table))
46
47
            except sqlite3.Error as e:
48
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
49
50 1
            self._conn.commit()
51
52 1
    def save_node_value(self, node_id, datavalue):
53 1
        with self._lock:
54 1
            _c_sub = self._conn.cursor()
55
56 1
            table = self._get_table_name(node_id)
57
58
            # insert the data change into the database
59 1
            try:
60 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
61
                               (
62
                                   datavalue.ServerTimestamp,
63
                                   datavalue.SourceTimestamp,
64
                                   datavalue.StatusCode.value,
65
                                   str(datavalue.Value.Value),
66
                                   datavalue.Value.VariantType.name,
67
                                   sqlite3.Binary(datavalue.Value.to_binary())
68
                               )
69
                               )
70
            except sqlite3.Error as e:
71
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
72
73 1
            self._conn.commit()
74
75
            # get this node's period from the period dict and calculate the limit
76 1
            period, count = self._datachanges_period[node_id]
77
78 1
            if period:
79
                # after the insert, if a period was specified delete all records older than period
80
                date_limit = datetime.now() - period
81
82
                try:
83
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
84
                                   (date_limit.isoformat(' '),))
85
                except sqlite3.Error as e:
86
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87
88
                self._conn.commit()
89
90 1
    def read_node_history(self, node_id, start, end, nb_values):
91 1
        with self._lock:
92 1
            _c_read = self._conn.cursor()
93
94 1
            order = "ASC"
95
96 1
            if start is None or start == ua.DateTimeMinValue:
97 1
                order = "DESC"
98 1
                start = ua.DateTimeMinValue
99
100 1
            if end is None or end == ua.DateTimeMinValue:
101 1
                end = datetime.utcnow() + timedelta(days=1)
102
103 1
            if start < end:
104 1
                start_time = start.isoformat(' ')
105 1
                end_time = end.isoformat(' ')
106
            else:
107 1
                order = "DESC"
108 1
                start_time = end.isoformat(' ')
109 1
                end_time = start.isoformat(' ')
110
111 1
            if nb_values:
112 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
113
            else:
114 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
115
116 1
            table = self._get_table_name(node_id)
117
118 1
            cont = None
119 1
            results = []
120
121
            # select values from the database; recreate UA Variant from binary
122 1
            try:
123 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
124
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
125
                                           (start_time, end_time, limit,)):
126
127
                    # rebuild the data value object
128 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
129 1
                    dv.ServerTimestamp = row[1]
130 1
                    dv.SourceTimestamp = row[2]
131 1
                    dv.StatusCode = ua.StatusCode(row[3])
132
133 1
                    results.append(dv)
134
135
            except sqlite3.Error as e:
136
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
137
138 1
            if nb_values:
139 1
                if len(results) > nb_values:
140 1
                    cont = results[nb_values].ServerTimestamp
141
142 1 View Code Duplication
                results = results[:nb_values]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
144 1
            return results, cont
145
146 1
    def new_historized_event(self, source_id, ev_fields, period):
147 1
        with self._lock:
148 1
            _c_new = self._conn.cursor()
149
150 1
            self._datachanges_period[source_id] = period
151 1
            self._event_fields[source_id] = ev_fields
152
153 1
            table = self._get_table_name(source_id)
154 1
            columns = self._get_event_columns(ev_fields)
155
156
            # create a table for the event which will store fields generated by the source object's events
157
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
158
            # properties with these names
159 1
            try:
160 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
161
                               '_Timestamp TIMESTAMP, '
162
                               '_EventTypeName TEXT, '
163
                               '{co})'.format(tn=table, co=columns))
164
165
            except sqlite3.Error as e:
166
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
167
168 1
            self._conn.commit()
169
170 1
    def save_event(self, event):
171 1
        with self._lock:
172 1
            _c_sub = self._conn.cursor()
173
174 1
            table = self._get_table_name(event.SourceNode)
175 1
            columns, placeholders, evtup = self._format_event(event)
176 1
            event_type = event.EventType  # useful for troubleshooting database
177
178
            # insert the event into the database
179 1
            try:
180 1
                _c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
181
                               'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
182
183
            except sqlite3.Error as e:
184
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
185
186 1
            self._conn.commit()
187
188
            # get this node's period from the period dict and calculate the limit
189 1
            period = self._datachanges_period[event.SourceNode]
190
191 1
            if period:
192
                # after the insert, if a period was specified delete all records older than period
193
                date_limit = datetime.now() - period
194
195
                try:
196
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
197
                                   (date_limit.isoformat(' '),))
198
                except sqlite3.Error as e:
199
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
200
201
                self._conn.commit()
202
203 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
204 1
        with self._lock:
205
206 1
            _c_read = self._conn.cursor()
207
208 1
            order = "ASC"
209
210 1
            if start is None or start == ua.DateTimeMinValue:
211 1
                order = "DESC"
212 1
                start = ua.DateTimeMinValue
213
214 1
            if end is None or end == ua.DateTimeMinValue:
215 1
                end = datetime.utcnow() + timedelta(days=1)
216
217 1
            if start < end:
218 1
                start_time = start.isoformat(' ')
219 1
                end_time = end.isoformat(' ')
220
            else:
221 1
                order = "DESC"
222 1
                start_time = end.isoformat(' ')
223 1
                end_time = start.isoformat(' ')
224
225 1
            if nb_values:
226 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
227
            else:
228 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
229
230 1
            table = self._get_table_name(source_id)
231 1
            clauses = self._get_select_clauses(source_id, evfilter)
232
233 1
            cont = None
234 1
            cont_timestamps = []
235 1
            results = []
236
237
            # select events from the database; SQL select clause is built from EventFilter and available fields
238 1
            try:
239 1
                for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
240
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
241
                                           (start_time, end_time, limit,)):
242
243
                    # place all the variants in the event field list object
244 1
                    hist_ev_field_list = ua.HistoryEventFieldList()
245 1
                    i = 0
246 1
                    for field in row:
247
                        # if the field is the _Timestamp column store it in a list used for getting the continuation
248 1
                        if i == 0:
249 1
                            cont_timestamps.append(field)
250
                        else:
251 1
                            if field is not None:
252 1
                                hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
253
                            else:
254
                                hist_ev_field_list.EventFields.append(ua.Variant(None))
255 1
                        i += 1
256
257 1
                    results.append(hist_ev_field_list)
258
259
            except sqlite3.Error as e:
260
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
261
262 1
            if nb_values:
263 1
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
264 1
                    cont = cont_timestamps[nb_values]
265
266 1
                results = results[:nb_values]
267
268 1
            return results, cont
269
270 1
    def _get_table_name(self, node_id):
271 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
272
273 1
    def _format_event(self, event_result):
274 1
        placeholders = []
275 1
        ev_fields = []
276 1
        ev_variant_binaries = []
277
278 1
        ev_variant_dict = event_result.get_event_props_as_fields_dict()
279
280
        # split dict into two synchronized lists which will be converted to SQL strings
281
        # note that the variants are converted to binary objects for storing in SQL BLOB format
282 1
        for field, variant in ev_variant_dict.items():
283 1
            placeholders.append('?')
284 1
            ev_fields.append(field)
285 1
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
286
287 1
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
288
289 1
    def _get_event_columns(self, ev_fields):
290 1
        fields = []
291 1
        for field in ev_fields:
292 1
                fields.append(field + ' BLOB')
293 1
        return self._list_to_sql_str(fields, False)
294
295 1
    def _get_select_clauses(self, source_id, evfilter):
296 1
        s_clauses = []
297 1
        for select_clause in evfilter.SelectClauses:
298 1
            try:
299 1
                if not select_clause.BrowsePath:
300
                    s_clauses.append(select_clause.Attribute.name)
301
                else:
302 1
                    name = select_clause.BrowsePath[0].Name
303 1
                    s_clauses.append(name)
304
            except AttributeError:
305
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
306
                                    ' Clause: %s:', source_id, select_clause)
307
308
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
309 1
        clauses = [x for x in s_clauses if self._check(source_id, x)]
310
311 1
        return self._list_to_sql_str(clauses)
312
313 1
    def _check(self, source_id, s_clause):
314 1
        if s_clause in self._event_fields[source_id]:
315 1
            return True
316
        else:
317
            return False
318
319 1
    def _list_to_sql_str(self, ls, quotes=True):
320 1
        sql_str = ''
321 1
        for item in ls:
322 1
            if quotes:
323 1
                sql_str += '"' + item + '", '
324
            else:
325 1
                sql_str += item + ', '
326 1
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
327
328 1
    def stop(self):
329 1
        with self._lock:
330 1
            self._conn.close()
331
            self.logger.info('Historizing SQL connection closed')
332