Completed
Pull Request — master (#509)
by
unknown
04:56
created

HistorySQLite.execute_sql_delete()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0932

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 9
ccs 5
cts 7
cp 0.7143
crap 2.0932
rs 9.6666
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
import sqlite3
6
7 1
from opcua import ua
8 1
from opcua.ua.ua_binary import variant_from_binary, variant_to_binary
9 1
from opcua.common.utils import Buffer
10 1
from opcua.common import events
11 1
from opcua.server.history import HistoryStorageInterface
12
13
14 1
class HistorySQLite(HistoryStorageInterface):
15
    """
16
    history backend which stores data values and object events in a SQLite database
17
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
18
    the history database are in binary format (SQLite BLOBs)
19
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
20
    """
21
22 1
    def __init__(self, path="history.db"):
23 1
        self.logger = logging.getLogger(__name__)
24 1
        self._datachanges_period = {}
25 1
        self._db_file = path
26 1
        self._lock = Lock()
27 1
        self._event_fields = {}
28
29 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
30
31 1
    def new_historized_node(self, node_id, period, count=0):
32 1
        with self._lock:
33 1
            _c_new = self._conn.cursor()
34
35 1
            table = self._get_table_name(node_id)
36
37 1
            self._datachanges_period[node_id] = period, count
38
39
            # create a table for the node which will store attributes of the DataValue object
40
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
41 1
            try:
42 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
43
                               ' ServerTimestamp TIMESTAMP,'
44
                               ' SourceTimestamp TIMESTAMP,'
45
                               ' StatusCode INTEGER,'
46
                               ' Value TEXT,'
47
                               ' VariantType TEXT,'
48
                               ' VariantBinary BLOB)'.format(tn=table))
49
50
            except sqlite3.Error as e:
51
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
52
53 1
            self._conn.commit()
54
55 1
    def save_node_value(self, node_id, datavalue):
56 1
        with self._lock:
57 1
            _c_sub = self._conn.cursor()
58
59 1
            table = self._get_table_name(node_id)
60
61
            # insert the data change into the database
62 1
            try:
63 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
64
                               (
65
                                   datavalue.ServerTimestamp,
66
                                   datavalue.SourceTimestamp,
67
                                   datavalue.StatusCode.value,
68
                                   str(datavalue.Value.Value),
69
                                   datavalue.Value.VariantType.name,
70
                                   sqlite3.Binary(variant_to_binary(datavalue.Value))
71
                               )
72
                              )
73
            except sqlite3.Error as e:
74
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
75
76 1
            self._conn.commit()
77
78
            # get this node's period from the period dict and calculate the limit
79 1
            period, count = self._datachanges_period[node_id]
80
81 1
            def execute_sql_delete(condition, args):
82 1
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
83
84 1
                try:
85 1
                    _c_sub.execute(query, args)
86
                except sqlite3.Error as e:
87
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
88
89 1
                self._conn.commit()
90
91 1
            if period:
92
                # after the insert, if a period was specified delete all records older than period
93 1
                date_limit = datetime.utcnow() - period
94 1
                execute_sql_delete('ServerTimestamp < ?', (date_limit,))
95
96 1
            if count:
97
                # ensure that no more than count records are stored for the specified node
98 1
                execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
99
                                   'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
100
101 1
    def read_node_history(self, node_id, start, end, nb_values):
102 1
        with self._lock:
103 1
            _c_read = self._conn.cursor()
104
105 1
            table = self._get_table_name(node_id)
106 1
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
107
108 1
            cont = None
109 1
            results = []
110
111
            # select values from the database; recreate UA Variant from binary
112 1
            try:
113 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
114
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
115
                                           (start_time, end_time, limit,)):
116
117
                    # rebuild the data value object
118 1
                    dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
119 1
                    dv.ServerTimestamp = row[1]
120 1
                    dv.SourceTimestamp = row[2]
121 1
                    dv.StatusCode = ua.StatusCode(row[3])
122
123 1
                    results.append(dv)
124
125
            except sqlite3.Error as e:
126
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
127
128 1
            if nb_values:
129 1
                if len(results) > nb_values:
130 1
                    cont = results[nb_values].ServerTimestamp
131
132 1
                results = results[:nb_values]
133
134 1
            return results, cont
135
136 1
    def new_historized_event(self, source_id, evtypes, period, count=0):
137 1
        with self._lock:
138 1
            _c_new = self._conn.cursor()
139
140
            # get all fields for the event type nodes
141 1
            ev_fields = self._get_event_fields(evtypes)
142
143 1
            self._datachanges_period[source_id] = period
144 1
            self._event_fields[source_id] = ev_fields
145
146 1
            table = self._get_table_name(source_id)
147 1
            columns = self._get_event_columns(ev_fields)
148
149
            # create a table for the event which will store fields generated by the source object's events
150
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
151
            # properties with these names
152 1
            try:
153 1
                _c_new.execute(
154
                    'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
155
                    .format(tn=table, co=columns))
156
157 1
            except sqlite3.Error as e:
158 1
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
159
160 1
            self._conn.commit()
161
162 1
    def save_event(self, event):
163 1
        with self._lock:
164 1
            _c_sub = self._conn.cursor()
165
166 1
            table = self._get_table_name(event.SourceNode)
167
            columns, placeholders, evtup = self._format_event(event)
168
            event_type = event.EventType  # useful for troubleshooting database
169
170
            # insert the event into the database
171
            try:
172
                _c_sub.execute(
173
                    'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
174
                    .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
175
176
            except sqlite3.Error as e:
177
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
178
179
            self._conn.commit()
180
181
            # get this node's period from the period dict and calculate the limit
182
            period = self._datachanges_period[event.SourceNode]
183
184
            if period:
185
                # after the insert, if a period was specified delete all records older than period
186
                date_limit = datetime.utcnow() - period
187
188
                try:
189
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
190
                                   (date_limit.isoformat(' '),))
191
                except sqlite3.Error as e:
192
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s',
193
                                      event.SourceNode, e)
194
195
                self._conn.commit()
196
197 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
198 1
        with self._lock:
199 1
            _c_read = self._conn.cursor()
200
201 1
            table = self._get_table_name(source_id)
202 1
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
203 1
            clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
204
205 1
            cont = None
206 1
            cont_timestamps = []
207 1
            results = []
208
209
            # select events from the database; SQL select clause is built from EventFilter and available fields
210 1
            try:
211 1
                for row in _c_read.execute(
212
                        'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
213
                        .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
214
215
                    fdict = {}
216
                    cont_timestamps.append(row[0])
217
                    for i, field in enumerate(row[1:]):
218
                        if field is not None:
219
                            fdict[clauses[i]] = variant_from_binary(Buffer(field))
220
                        else:
221
                            fdict[clauses[i]] = ua.Variant(None)
222
223
                    results.append(events.Event.from_field_dict(fdict))
224
225 1
            except sqlite3.Error as e:
226 1
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
227
228 1
            if nb_values:
229 1
                if len(results) > nb_values:  # start > ua.get_win_epoch() and
230
                    cont = cont_timestamps[nb_values]
231
232 1
                results = results[:nb_values]
233
234 1
            return results, cont
235
236 1
    def _get_table_name(self, node_id):
237 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
238
239 1
    def _get_event_fields(self, evtypes):
240
        """
241
        Get all fields from the event types that are to be historized
242
        Args:
243
            evtypes: List of event type nodes
244
245
        Returns: List of fields for all event types
246
247
        """
248
        # get all fields from the event types that are to be historized
249 1
        ev_aggregate_fields = []
250 1
        for event_type in evtypes:
251 1
            ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
252
253 1
        ev_fields = []
254 1
        for field in set(ev_aggregate_fields):
255
            ev_fields.append(field.get_display_name().Text)
256 1
        return ev_fields
257
258 1
    @staticmethod
259
    def _get_bounds(start, end, nb_values):
260 1
        order = "ASC"
261
262 1
        if start is None or start == ua.get_win_epoch():
263 1
            order = "DESC"
264 1
            start = ua.get_win_epoch()
265
266 1
        if end is None or end == ua.get_win_epoch():
267 1
            end = datetime.utcnow() + timedelta(days=1)
268
269 1
        if start < end:
270 1
            start_time = start.isoformat(' ')
271 1
            end_time = end.isoformat(' ')
272
        else:
273 1
            order = "DESC"
274 1
            start_time = end.isoformat(' ')
275 1
            end_time = start.isoformat(' ')
276
277 1
        if nb_values:
278 1
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
279
        else:
280 1
            limit = -1  # in SQLite a LIMIT of -1 returns all results
281
282 1
        return start_time, end_time, order, limit
283
284 1
    def _format_event(self, event):
285
        """
286
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
287
288
        Args:
289
            event: The event returned by the subscription
290
291
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
292
293
        """
294
        placeholders = []
295
        ev_variant_binaries = []
296
297
        ev_variant_dict = event.get_event_props_as_fields_dict()
298
        names = list(ev_variant_dict.keys())
299
        names.sort()  # sort alphabetically since dict is not sorted
300
301
        # split dict into two synchronized lists which will be converted to SQL strings
302
        # note that the variants are converted to binary objects for storing in SQL BLOB format
303
        for name in names:
304
            variant = ev_variant_dict[name]
305
            placeholders.append('?')
306
            ev_variant_binaries.append(sqlite3.Binary(variant_to_binary(variant)))
307
308
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
309
310 1
    def _get_event_columns(self, ev_fields):
311 1
        fields = []
312 1
        for field in ev_fields:
313
            fields.append(field + ' BLOB')
314 1
        return self._list_to_sql_str(fields, False)
315
316 1
    def _get_select_clauses(self, source_id, evfilter):
317 1
        s_clauses = []
318 1
        for select_clause in evfilter.SelectClauses:
319
            try:
320
                if not select_clause.BrowsePath:
321
                    s_clauses.append(select_clause.Attribute.name)
322
                else:
323
                    name = select_clause.BrowsePath[0].Name
324
                    s_clauses.append(name)
325
            except AttributeError:
326
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
327
                                    ' Clause: %s:', source_id, select_clause)
328
329
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
330 1
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
331 1
        return clauses, self._list_to_sql_str(clauses)
332
333 1
    @staticmethod
334 1
    def _list_to_sql_str(ls, quotes=True):
335 1
        sql_str = ''
336 1
        for item in ls:
337
            if quotes:
338
                sql_str += '"' + item + '", '
339
            else:
340
                sql_str += item + ', '
341 1
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
342
343 1
    def stop(self):
344 1
        with self._lock:
345 1
            self._conn.close()
346
            self.logger.info('Historizing SQL connection closed')
347