Completed
Push — master ( 929991...db7296 )
by Olivier
03:31
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 12

Size

Total Lines 55

Duplication

Lines 10
Ratio 18.18 %

Code Coverage

Tests 33
CRAP Score 12.0268

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 12
c 4
b 0
f 0
dl 10
loc 55
ccs 33
cts 35
cp 0.9429
crap 12.0268
rs 3.4396

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    history backend which stores data values and object events in a SQLite database
14
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
15
    the history database are in binary format (SQLite BLOBs)
16
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
17
    """
18
19 1
    def __init__(self, path="history.db"):
20 1
        self.logger = logging.getLogger(__name__)
21 1
        self._datachanges_period = {}
22 1
        self._db_file = path
23 1
        self._lock = Lock()
24 1
        self._event_fields = {}
25 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
27
28 1
    def new_historized_node(self, node_id, period, count=0):
29 1
        with self._lock:
30 1
            _c_new = self._conn.cursor()
31
32 1
            table = self._get_table_name(node_id)
33
34 1
            self._datachanges_period[node_id] = period, count
35
36
            # create a table for the node which will store attributes of the DataValue object
37
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
38 1
            try:
39 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
40
                               ' ServerTimestamp TIMESTAMP,'
41
                               ' SourceTimestamp TIMESTAMP,'
42
                               ' StatusCode INTEGER,'
43
                               ' Value TEXT,'
44
                               ' VariantType TEXT,'
45
                               ' VariantBinary BLOB)'.format(tn=table))
46
47
            except sqlite3.Error as e:
48
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
49
50 1
            self._conn.commit()
51
52 1
    def save_node_value(self, node_id, datavalue):
53 1
        with self._lock:
54 1
            _c_sub = self._conn.cursor()
55
56 1
            table = self._get_table_name(node_id)
57
58
            # insert the data change into the database
59 1
            try:
60 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
61
                               (
62
                                   datavalue.ServerTimestamp,
63
                                   datavalue.SourceTimestamp,
64
                                   datavalue.StatusCode.value,
65
                                   str(datavalue.Value.Value),
66
                                   datavalue.Value.VariantType.name,
67
                                   sqlite3.Binary(datavalue.Value.to_binary())
68
                               )
69
                               )
70
            except sqlite3.Error as e:
71
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
72
73 1
            self._conn.commit()
74
75
            # get this node's period from the period dict and calculate the limit
76 1
            period, count = self._datachanges_period[node_id]
77
78 1
            def executeDeleteStatement(condition, args):
79
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn = table)
80
81
                try:
82
                    _c_sub.execute(query, args)
83
                except sqlite3.Error as e:
84
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
85
86
                self._conn.commit()
87
88
            if period:
89
                # after the insert, if a period was specified delete all records older than period
90 1
                date_limit = datetime.utcnow() - period
91 1
                executeDeleteStatement('ServerTimestamp < ?', (date_limit,))
92 1
93
            if count:
94 1
                #ensure that no more than count records are stored for the specified node
95
                executeDeleteStatement('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
96 1
97 1
    def read_node_history(self, node_id, start, end, nb_values):
98 1
        with self._lock:
99
            _c_read = self._conn.cursor()
100 1
101 1
            order = "ASC"
102
103 1
            if start is None or start == ua.DateTimeMinValue:
104 1
                order = "DESC"
105 1
                start = ua.DateTimeMinValue
106
107 1
            if end is None or end == ua.DateTimeMinValue:
108 1
                end = datetime.utcnow() + timedelta(days=1)
109 1
110
            if start < end:
111 1
                start_time = start.isoformat(' ')
112 1
                end_time = end.isoformat(' ')
113
            else:
114 1
                order = "DESC"
115
                start_time = end.isoformat(' ')
116 1
                end_time = start.isoformat(' ')
117
118 1
            if nb_values:
119 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
120
            else:
121
                limit = -1  # in SQLite a LIMIT of -1 returns all results
122 1
123 1
            table = self._get_table_name(node_id)
124
125
            cont = None
126
            results = []
127
128 1
            # select values from the database; recreate UA Variant from binary
129 1
            try:
130 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
131 1
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
132
                                           (start_time, end_time, limit,)):
133 1
134
                    # rebuild the data value object
135
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
136
                    dv.ServerTimestamp = row[1]
137
                    dv.SourceTimestamp = row[2]
138 1
                    dv.StatusCode = ua.StatusCode(row[3])
139 1
140 1
                    results.append(dv)
141
142 1 View Code Duplication
            except sqlite3.Error as e:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
144 1
145
            if nb_values:
146 1
                if len(results) > nb_values:
147 1
                    cont = results[nb_values].ServerTimestamp
148 1
149
                results = results[:nb_values]
150 1
151 1
            return results, cont
152
153 1
    def new_historized_event(self, source_id, ev_fields, period):
154 1
        with self._lock:
155
            _c_new = self._conn.cursor()
156
157
            self._datachanges_period[source_id] = period
158
            self._event_fields[source_id] = ev_fields
159 1
160 1
            table = self._get_table_name(source_id)
161
            columns = self._get_event_columns(ev_fields)
162
163
            # create a table for the event which will store fields generated by the source object's events
164
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
165
            # properties with these names
166
            try:
167
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
168 1
                               '_Timestamp TIMESTAMP, '
169
                               '_EventTypeName TEXT, '
170 1
                               '{co})'.format(tn=table, co=columns))
171 1
172 1
            except sqlite3.Error as e:
173
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
174 1
175 1
            self._conn.commit()
176 1
177
    def save_event(self, event):
178
        with self._lock:
179 1
            _c_sub = self._conn.cursor()
180 1
181
            table = self._get_table_name(event.SourceNode)
182
            columns, placeholders, evtup = self._format_event(event)
183
            event_type = event.EventType  # useful for troubleshooting database
184
185
            # insert the event into the database
186 1
            try:
187
                _c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
188
                               'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
189 1
190
            except sqlite3.Error as e:
191 1
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
192
193
            self._conn.commit()
194
195
            # get this node's period from the period dict and calculate the limit
196
            period = self._datachanges_period[event.SourceNode]
197
198
            if period:
199
                # after the insert, if a period was specified delete all records older than period
200
                date_limit = datetime.now() - period
201
202
                try:
203 1
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
204 1
                                   (date_limit.isoformat(' '),))
205
                except sqlite3.Error as e:
206 1
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
207
208 1
                self._conn.commit()
209
210 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
211 1
        with self._lock:
212 1
213
            _c_read = self._conn.cursor()
214 1
215 1
            order = "ASC"
216
217 1
            if start is None or start == ua.DateTimeMinValue:
218 1
                order = "DESC"
219 1
                start = ua.DateTimeMinValue
220
221 1
            if end is None or end == ua.DateTimeMinValue:
222 1
                end = datetime.utcnow() + timedelta(days=1)
223 1
224
            if start < end:
225 1
                start_time = start.isoformat(' ')
226 1
                end_time = end.isoformat(' ')
227
            else:
228 1
                order = "DESC"
229
                start_time = end.isoformat(' ')
230 1
                end_time = start.isoformat(' ')
231 1
232
            if nb_values:
233 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
234 1
            else:
235 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
236
237
            table = self._get_table_name(source_id)
238 1
            clauses = self._get_select_clauses(source_id, evfilter)
239 1
240
            cont = None
241
            cont_timestamps = []
242
            results = []
243
244 1
            # select events from the database; SQL select clause is built from EventFilter and available fields
245 1
            try:
246 1
                for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
247
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
248 1
                                           (start_time, end_time, limit,)):
249 1
250
                    # place all the variants in the event field list object
251 1
                    hist_ev_field_list = ua.HistoryEventFieldList()
252 1
                    i = 0
253
                    for field in row:
254
                        # if the field is the _Timestamp column store it in a list used for getting the continuation
255 1
                        if i == 0:
256
                            cont_timestamps.append(field)
257 1
                        else:
258
                            if field is not None:
259
                                hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
260
                            else:
261
                                hist_ev_field_list.EventFields.append(ua.Variant(None))
262 1
                        i += 1
263 1
264 1
                    results.append(hist_ev_field_list)
265
266 1
            except sqlite3.Error as e:
267
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
268 1
269
            if nb_values:
270 1
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
271 1
                    cont = cont_timestamps[nb_values]
272
273 1
                results = results[:nb_values]
274 1
275 1
            return results, cont
276 1
277
    def _get_table_name(self, node_id):
278 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
279
280
    def _format_event(self, event_result):
281
        placeholders = []
282 1
        ev_fields = []
283 1
        ev_variant_binaries = []
284 1
285 1
        ev_variant_dict = event_result.get_event_props_as_fields_dict()
286
287 1
        # split dict into two synchronized lists which will be converted to SQL strings
288
        # note that the variants are converted to binary objects for storing in SQL BLOB format
289 1
        for field, variant in ev_variant_dict.items():
290 1
            placeholders.append('?')
291 1
            ev_fields.append(field)
292 1
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
293 1
294
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
295 1
296 1
    def _get_event_columns(self, ev_fields):
297 1
        fields = []
298 1
        for field in ev_fields:
299 1
                fields.append(field + ' BLOB')
300
        return self._list_to_sql_str(fields, False)
301
302 1
    def _get_select_clauses(self, source_id, evfilter):
303 1
        s_clauses = []
304
        for select_clause in evfilter.SelectClauses:
305
            try:
306
                if not select_clause.BrowsePath:
307
                    s_clauses.append(select_clause.Attribute.name)
308
                else:
309 1
                    name = select_clause.BrowsePath[0].Name
310
                    s_clauses.append(name)
311 1
            except AttributeError:
312
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
313 1
                                    ' Clause: %s:', source_id, select_clause)
314 1
315 1
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
316
        clauses = [x for x in s_clauses if self._check(source_id, x)]
317
318
        return self._list_to_sql_str(clauses)
319 1
320 1
    def _check(self, source_id, s_clause):
321 1
        if s_clause in self._event_fields[source_id]:
322 1
            return True
323 1
        else:
324
            return False
325 1
326 1
    def _list_to_sql_str(self, ls, quotes=True):
327
        sql_str = ''
328 1
        for item in ls:
329 1
            if quotes:
330 1
                sql_str += '"' + item + '", '
331 1
            else:
332
                sql_str += item + ', '
333
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
334
335
    def stop(self):
336
        with self._lock:
337
            self._conn.close()
338
            self.logger.info('Historizing SQL connection closed')
339