Completed
Pull Request — master (#494)
by Olivier
03:37
created

HistorySQLite   D

Complexity

Total Complexity 59

Size/Duplication

Total Lines 333
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 333
ccs 0
cts 185
cp 0
rs 4.5454
wmc 59

16 Methods

Rating   Name   Duplication   Size   Complexity  
A _get_table_name() 0 2 1
A stop() 0 4 2
C save_node_value() 0 45 7
A new_historized_node() 0 23 3
A __init__() 0 8 1
C _get_bounds() 0 25 7
A execute_sql_delete() 0 9 2
B new_historized_event() 0 25 3
A _list_to_sql_str() 0 9 3
B _get_select_clauses() 0 16 6
A _get_event_fields() 0 18 3
B _format_event() 0 25 2
B save_event() 0 34 5
B read_node_history() 0 34 6
D read_event_history() 0 38 8
A _get_event_columns() 0 5 2

How to fix   Complexity   

Complex Class

Complex classes like HistorySQLite often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
from datetime import timedelta
3
from datetime import datetime
4
from threading import Lock
5
import sqlite3
6
7
from opcua import ua
8
from opcua.common.utils import Buffer
9
from opcua.common import events
10
from opcua.server.history import HistoryStorageInterface
11
12
13
class HistorySQLite(HistoryStorageInterface):
14
    """
15
    history backend which stores data values and object events in a SQLite database
16
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
17
    the history database are in binary format (SQLite BLOBs)
18
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
19
    """
20
21
    def __init__(self, path="history.db"):
22
        self.logger = logging.getLogger(__name__)
23
        self._datachanges_period = {}
24
        self._db_file = path
25
        self._lock = Lock()
26
        self._event_fields = {}
27
28
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
29
30
    def new_historized_node(self, node_id, period, count=0):
31
        with self._lock:
32
            _c_new = self._conn.cursor()
33
34
            table = self._get_table_name(node_id)
35
36
            self._datachanges_period[node_id] = period, count
37
38
            # create a table for the node which will store attributes of the DataValue object
39
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
40
            try:
41
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
42
                               ' ServerTimestamp TIMESTAMP,'
43
                               ' SourceTimestamp TIMESTAMP,'
44
                               ' StatusCode INTEGER,'
45
                               ' Value TEXT,'
46
                               ' VariantType TEXT,'
47
                               ' VariantBinary BLOB)'.format(tn=table))
48
49
            except sqlite3.Error as e:
50
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
51
52
            self._conn.commit()
53
54
    def save_node_value(self, node_id, datavalue):
55
        with self._lock:
56
            _c_sub = self._conn.cursor()
57
58
            table = self._get_table_name(node_id)
59
60
            # insert the data change into the database
61
            try:
62
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
63
                               (
64
                                   datavalue.ServerTimestamp,
65
                                   datavalue.SourceTimestamp,
66
                                   datavalue.StatusCode.value,
67
                                   str(datavalue.Value.Value),
68
                                   datavalue.Value.VariantType.name,
69
                                   sqlite3.Binary(datavalue.Value.to_binary())
70
                               )
71
                              )
72
            except sqlite3.Error as e:
73
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
74
75
            self._conn.commit()
76
77
            # get this node's period from the period dict and calculate the limit
78
            period, count = self._datachanges_period[node_id]
79
80
            def execute_sql_delete(condition, args):
81
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
82
83
                try:
84
                    _c_sub.execute(query, args)
85
                except sqlite3.Error as e:
86
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87
88
                self._conn.commit()
89
90
            if period:
91
                # after the insert, if a period was specified delete all records older than period
92
                date_limit = datetime.utcnow() - period
93
                execute_sql_delete('ServerTimestamp < ?', (date_limit,))
94
95
            if count:
96
                # ensure that no more than count records are stored for the specified node
97
                execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
98
                                   'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
99
100
    def read_node_history(self, node_id, start, end, nb_values):
101
        with self._lock:
102
            _c_read = self._conn.cursor()
103
104
            table = self._get_table_name(node_id)
105
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
106
107
            cont = None
108
            results = []
109
110
            # select values from the database; recreate UA Variant from binary
111
            try:
112
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
113
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
114
                                           (start_time, end_time, limit,)):
115
116
                    # rebuild the data value object
117
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
118
                    dv.ServerTimestamp = row[1]
119
                    dv.SourceTimestamp = row[2]
120
                    dv.StatusCode = ua.StatusCode(row[3])
121
122
                    results.append(dv)
123
124
            except sqlite3.Error as e:
125
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
126
127
            if nb_values:
128
                if len(results) > nb_values:
129
                    cont = results[nb_values].ServerTimestamp
130
131
                results = results[:nb_values]
132
133
            return results, cont
134
135
    def new_historized_event(self, source_id, evtypes, period, count=0):
136
        with self._lock:
137
            _c_new = self._conn.cursor()
138
139
            # get all fields for the event type nodes
140
            ev_fields = self._get_event_fields(evtypes)
141
142
            self._datachanges_period[source_id] = period
143
            self._event_fields[source_id] = ev_fields
144
145
            table = self._get_table_name(source_id)
146
            columns = self._get_event_columns(ev_fields)
147
148
            # create a table for the event which will store fields generated by the source object's events
149
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
150
            # properties with these names
151
            try:
152
                _c_new.execute(
153
                    'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
154
                    .format(tn=table, co=columns))
155
156
            except sqlite3.Error as e:
157
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
158
159
            self._conn.commit()
160
161
    def save_event(self, event):
162
        with self._lock:
163
            _c_sub = self._conn.cursor()
164
165
            table = self._get_table_name(event.SourceNode)
166
            columns, placeholders, evtup = self._format_event(event)
167
            event_type = event.EventType  # useful for troubleshooting database
168
169
            # insert the event into the database
170
            try:
171
                _c_sub.execute(
172
                    'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
173
                    .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
174
175
            except sqlite3.Error as e:
176
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
177
178
            self._conn.commit()
179
180
            # get this node's period from the period dict and calculate the limit
181
            period = self._datachanges_period[event.SourceNode]
182
183
            if period:
184
                # after the insert, if a period was specified delete all records older than period
185
                date_limit = datetime.utcnow() - period
186
187
                try:
188
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
189
                                   (date_limit.isoformat(' '),))
190
                except sqlite3.Error as e:
191
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s',
192
                                      event.SourceNode, e)
193
194
                self._conn.commit()
195
196
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
197
        with self._lock:
198
            _c_read = self._conn.cursor()
199
200
            table = self._get_table_name(source_id)
201
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
202
            clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
203
204
            cont = None
205
            cont_timestamps = []
206
            results = []
207
208
            # select events from the database; SQL select clause is built from EventFilter and available fields
209
            try:
210
                for row in _c_read.execute(
211
                        'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
212
                        .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
213
214
                    fdict = {}
215
                    cont_timestamps.append(row[0])
216
                    for i, field in enumerate(row[1:]):
217
                        if field is not None:
218
                            fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
219
                        else:
220
                            fdict[clauses[i]] = ua.Variant(None)
221
222
                    results.append(events.Event.from_field_dict(fdict))
223
224
            except sqlite3.Error as e:
225
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
226
227
            if nb_values:
228
                if len(results) > nb_values:  # start > ua.get_win_epoch() and
229
                    cont = cont_timestamps[nb_values]
230
231
                results = results[:nb_values]
232
233
            return results, cont
234
235
    def _get_table_name(self, node_id):
236
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
237
238
    def _get_event_fields(self, evtypes):
239
        """
240
        Get all fields from the event types that are to be historized
241
        Args:
242
            evtypes: List of event type nodes
243
244
        Returns: List of fields for all event types
245
246
        """
247
        # get all fields from the event types that are to be historized
248
        ev_aggregate_fields = []
249
        for event_type in evtypes:
250
            ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
251
252
        ev_fields = []
253
        for field in set(ev_aggregate_fields):
254
            ev_fields.append(field.get_display_name().Text)
255
        return ev_fields
256
257
    @staticmethod
258
    def _get_bounds(start, end, nb_values):
259
        order = "ASC"
260
261
        if start is None or start == ua.get_win_epoch():
262
            order = "DESC"
263
            start = ua.get_win_epoch()
264
265
        if end is None or end == ua.get_win_epoch():
266
            end = datetime.utcnow() + timedelta(days=1)
267
268
        if start < end:
269
            start_time = start.isoformat(' ')
270
            end_time = end.isoformat(' ')
271
        else:
272
            order = "DESC"
273
            start_time = end.isoformat(' ')
274
            end_time = start.isoformat(' ')
275
276
        if nb_values:
277
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
278
        else:
279
            limit = -1  # in SQLite a LIMIT of -1 returns all results
280
281
        return start_time, end_time, order, limit
282
283
    def _format_event(self, event):
284
        """
285
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
286
287
        Args:
288
            event: The event returned by the subscription
289
290
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
291
292
        """
293
        placeholders = []
294
        ev_variant_binaries = []
295
296
        ev_variant_dict = event.get_event_props_as_fields_dict()
297
        names = list(ev_variant_dict.keys())
298
        names.sort()  # sort alphabetically since dict is not sorted
299
300
        # split dict into two synchronized lists which will be converted to SQL strings
301
        # note that the variants are converted to binary objects for storing in SQL BLOB format
302
        for name in names:
303
            variant = ev_variant_dict[name]
304
            placeholders.append('?')
305
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
306
307
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
308
309
    def _get_event_columns(self, ev_fields):
310
        fields = []
311
        for field in ev_fields:
312
            fields.append(field + ' BLOB')
313
        return self._list_to_sql_str(fields, False)
314
315
    def _get_select_clauses(self, source_id, evfilter):
316
        s_clauses = []
317
        for select_clause in evfilter.SelectClauses:
318
            try:
319
                if not select_clause.BrowsePath:
320
                    s_clauses.append(select_clause.Attribute.name)
321
                else:
322
                    name = select_clause.BrowsePath[0].Name
323
                    s_clauses.append(name)
324
            except AttributeError:
325
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
326
                                    ' Clause: %s:', source_id, select_clause)
327
328
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
329
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
330
        return clauses, self._list_to_sql_str(clauses)
331
332
    @staticmethod
333
    def _list_to_sql_str(ls, quotes=True):
334
        sql_str = ''
335
        for item in ls:
336
            if quotes:
337
                sql_str += '"' + item + '", '
338
            else:
339
                sql_str += item + ', '
340
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
341
342
    def stop(self):
343
        with self._lock:
344
            self._conn.close()
345
            self.logger.info('Historizing SQL connection closed')
346