Completed
Push — master ( 4652a3...eeb0ed )
by Olivier
03:54
created

HistorySQLite.save_node_value()   C

Complexity

Conditions 7

Size

Total Lines 45

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 9.3554

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 7
c 3
b 1
f 0
dl 0
loc 45
ccs 14
cts 22
cp 0.6364
crap 9.3554
rs 5.5

1 Method

Rating   Name   Duplication   Size   Complexity  
A HistorySQLite.execute_sql_delete() 0 9 2
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
import sqlite3
6 1
7 1
from opcua import ua
8 1
from opcua.common.utils import Buffer
9
from opcua.common import events
10
from opcua.server.history import HistoryStorageInterface
11 1
12
13
class HistorySQLite(HistoryStorageInterface):
14
    """
15
    history backend which stores data values and object events in a SQLite database
16
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
17
    the history database are in binary format (SQLite BLOBs)
18
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
19 1
    """
20 1
21 1
    def __init__(self, path="history.db"):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self._datachanges_period = {}
24 1
        self._db_file = path
25
        self._lock = Lock()
26 1
        self._event_fields = {}
27
28 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
29 1
30 1
    def new_historized_node(self, node_id, period, count=0):
31
        with self._lock:
32 1
            _c_new = self._conn.cursor()
33
34 1
            table = self._get_table_name(node_id)
35
36
            self._datachanges_period[node_id] = period, count
37
38 1
            # create a table for the node which will store attributes of the DataValue object
39 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
40
            try:
41
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
42
                               ' ServerTimestamp TIMESTAMP,'
43
                               ' SourceTimestamp TIMESTAMP,'
44
                               ' StatusCode INTEGER,'
45
                               ' Value TEXT,'
46
                               ' VariantType TEXT,'
47
                               ' VariantBinary BLOB)'.format(tn=table))
48
49
            except sqlite3.Error as e:
50 1
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
51
52 1
            self._conn.commit()
53 1
54 1
    def save_node_value(self, node_id, datavalue):
55
        with self._lock:
56 1
            _c_sub = self._conn.cursor()
57
58
            table = self._get_table_name(node_id)
59 1
60 1
            # insert the data change into the database
61
            try:
62
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
63
                               (
64
                                   datavalue.ServerTimestamp,
65
                                   datavalue.SourceTimestamp,
66
                                   datavalue.StatusCode.value,
67
                                   str(datavalue.Value.Value),
68
                                   datavalue.Value.VariantType.name,
69
                                   sqlite3.Binary(datavalue.Value.to_binary())
70
                               )
71
                              )
72
            except sqlite3.Error as e:
73 1
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
74
75
            self._conn.commit()
76 1
77
            # get this node's period from the period dict and calculate the limit
78 1
            period, count = self._datachanges_period[node_id]
79
80
            def execute_sql_delete(condition, args):
81
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
82
83
                try:
84
                    _c_sub.execute(query, args)
85
                except sqlite3.Error as e:
86
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87
88
                self._conn.commit()
89
90 1
            if period:
91 1
                # after the insert, if a period was specified delete all records older than period
92 1
                date_limit = datetime.utcnow() - period
93
                execute_sql_delete('ServerTimestamp < ?', (date_limit,))
94 1
95
            if count:
96 1
                # ensure that no more than count records are stored for the specified node
97 1
                execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
98 1
                                   'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
99
100 1
    def read_node_history(self, node_id, start, end, nb_values):
101 1
        with self._lock:
102
            _c_read = self._conn.cursor()
103 1
104 1
            table = self._get_table_name(node_id)
105 1
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
106
107 1
            cont = None
108 1
            results = []
109 1
110
            # select values from the database; recreate UA Variant from binary
111 1
            try:
112 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
113
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
114 1
                                           (start_time, end_time, limit,)):
115
116 1
                    # rebuild the data value object
117
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
118 1
                    dv.ServerTimestamp = row[1]
119 1
                    dv.SourceTimestamp = row[2]
120
                    dv.StatusCode = ua.StatusCode(row[3])
121
122 1
                    results.append(dv)
123 1
124
            except sqlite3.Error as e:
125
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
126
127
            if nb_values:
128 1
                if len(results) > nb_values:
129 1
                    cont = results[nb_values].ServerTimestamp
130 1
131 1
                results = results[:nb_values]
132
133 1
            return results, cont
134
135
    def new_historized_event(self, source_id, evtypes, period, count=0):
136
        with self._lock:
137
            _c_new = self._conn.cursor()
138 1
139 1
            # get all fields for the event type nodes
140 1
            ev_fields = self._get_event_fields(evtypes)
141
142 1
            self._datachanges_period[source_id] = period
143
            self._event_fields[source_id] = ev_fields
144 1
145
            table = self._get_table_name(source_id)
146 1
            columns = self._get_event_columns(ev_fields)
147 1
148 1
            # create a table for the event which will store fields generated by the source object's events
149
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
150 1
            # properties with these names
151 1
            try:
152
                _c_new.execute(
153 1
                    'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
154 1
                    .format(tn=table, co=columns))
155
156
            except sqlite3.Error as e:
157
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
158
159 1
            self._conn.commit()
160 1
161
    def save_event(self, event):
162
        with self._lock:
163
            _c_sub = self._conn.cursor()
164
165
            table = self._get_table_name(event.SourceNode)
166
            columns, placeholders, evtup = self._format_event(event)
167
            event_type = event.EventType  # useful for troubleshooting database
168 1
169
            # insert the event into the database
170 1
            try:
171 1
                _c_sub.execute(
172 1
                    'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
173
                    .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
174 1
175 1
            except sqlite3.Error as e:
176 1
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
177
178
            self._conn.commit()
179 1
180 1
            # get this node's period from the period dict and calculate the limit
181
            period = self._datachanges_period[event.SourceNode]
182
183
            if period:
184
                # after the insert, if a period was specified delete all records older than period
185
                date_limit = datetime.now() - period
186 1
187
                try:
188
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
189 1
                                   (date_limit.isoformat(' '),))
190
                except sqlite3.Error as e:
191 1
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
192
193
                self._conn.commit()
194
195
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
196
        with self._lock:
197
            _c_read = self._conn.cursor()
198
199
            table = self._get_table_name(source_id)
200
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
201
            clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
202
203 1
            cont = None
204 1
            cont_timestamps = []
205
            results = []
206 1
207
            # select events from the database; SQL select clause is built from EventFilter and available fields
208 1
            try:
209
                for row in _c_read.execute(
210 1
                        'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
211 1
                        .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
212 1
213
                    fdict = {}
214 1
                    cont_timestamps.append(row[0])
215 1
                    for i, field in enumerate(row[1:]):
216
                        if field is not None:
217 1
                            fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
218 1
                        else:
219 1
                            fdict[clauses[i]] = ua.Variant(None)
220
221 1
                    results.append(events.Event.from_field_dict(fdict))
222 1
223 1
            except sqlite3.Error as e:
224
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
225 1
226 1
            if nb_values:
227
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
228 1
                    cont = cont_timestamps[nb_values]
229
230 1
                results = results[:nb_values]
231 1
232
            return results, cont
233 1
234 1
    def _get_table_name(self, node_id):
235 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
236
237
    def _get_event_fields(self, evtypes):
238 1
        """
239 1
        Get all fields from the event types that are to be historized
240
        Args:
241
            evtypes: List of event type nodes
242
243
        Returns: List of fields for all event types
244 1
245 1
        """
246 1
        # get all fields from the event types that are to be historized
247
        ev_aggregate_fields = []
248 1
        for event_type in evtypes:
249 1
            ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
250
251 1
        ev_fields = []
252 1
        for field in set(ev_aggregate_fields):
253
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
254
        return ev_fields
255 1
256
    @staticmethod
257 1
    def _get_bounds(start, end, nb_values):
258
        order = "ASC"
259
260
        if start is None or start == ua.DateTimeMinValue:
261
            order = "DESC"
262 1
            start = ua.DateTimeMinValue
263 1
264 1
        if end is None or end == ua.DateTimeMinValue:
265
            end = datetime.utcnow() + timedelta(days=1)
266 1
267
        if start < end:
268 1
            start_time = start.isoformat(' ')
269
            end_time = end.isoformat(' ')
270 1
        else:
271 1
            order = "DESC"
272
            start_time = end.isoformat(' ')
273 1
            end_time = start.isoformat(' ')
274 1
275 1
        if nb_values:
276 1
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
277
        else:
278 1
            limit = -1  # in SQLite a LIMIT of -1 returns all results
279
280
        return start_time, end_time, order, limit
281
282 1
    def _format_event(self, event):
283 1
        """
284 1
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
285 1
286
        Args:
287 1
            event: The event returned by the subscription
288
289 1
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
290 1
291 1
        """
292 1
        placeholders = []
293 1
        ev_variant_binaries = []
294
295 1
        ev_variant_dict = event.get_event_props_as_fields_dict()
296 1
        names = list(ev_variant_dict.keys())
297 1
        names.sort()  # sort alphabetically since dict is not sorted
298 1
299 1
        # split dict into two synchronized lists which will be converted to SQL strings
300
        # note that the variants are converted to binary objects for storing in SQL BLOB format
301
        for name in names:
302 1
            variant = ev_variant_dict[name]
303 1
            placeholders.append('?')
304
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
305
306
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
307
308
    def _get_event_columns(self, ev_fields):
309 1
        fields = []
310
        for field in ev_fields:
311 1
            fields.append(field + ' BLOB')
312
        return self._list_to_sql_str(fields, False)
313 1
314 1
    def _get_select_clauses(self, source_id, evfilter):
315 1
        s_clauses = []
316
        for select_clause in evfilter.SelectClauses:
317
            try:
318
                if not select_clause.BrowsePath:
319 1
                    s_clauses.append(select_clause.Attribute.name)
320 1
                else:
321 1
                    name = select_clause.BrowsePath[0].Name
322 1
                    s_clauses.append(name)
323 1
            except AttributeError:
324
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
325 1
                                    ' Clause: %s:', source_id, select_clause)
326 1
327
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
328 1
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
329 1
        return clauses, self._list_to_sql_str(clauses)
330 1
331 1
    @staticmethod
332
    def _list_to_sql_str(ls, quotes=True):
333
        sql_str = ''
334
        for item in ls:
335
            if quotes:
336
                sql_str += '"' + item + '", '
337
            else:
338
                sql_str += item + ', '
339
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
340
341
    def stop(self):
342
        with self._lock:
343
            self._conn.close()
344
            self.logger.info('Historizing SQL connection closed')
345