Completed
Pull Request — master (#180)
by
unknown
03:09
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 12

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 34
CRAP Score 12.0247

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 12
c 3
b 0
f 0
dl 0
loc 54
ccs 34
cts 36
cp 0.9444
crap 12.0247
rs 3.5428

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._db_file = path
20 1
        self._lock = Lock()
21 1
        self._event_fields = {}
22
23 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
24
25 1 View Code Duplication
    def new_historized_node(self, node_id, period, count=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        with self._lock:
27 1
            _c_new = self._conn.cursor()
28
29 1
            table = self._get_table_name(node_id)
30
31 1
            self._datachanges_period[node_id] = period, count
32
33
            # create a table for the node which will store attributes of the DataValue object
34
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
35 1
            try:
36 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
37
                               ' ServerTimestamp TIMESTAMP,'
38
                               ' SourceTimestamp TIMESTAMP,'
39
                               ' StatusCode INTEGER,'
40
                               ' Value TEXT,'
41
                               ' VariantType TEXT,'
42
                               ' VariantBinary BLOB)'.format(tn=table))
43
44
            except sqlite3.Error as e:
45
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
46
47 1
            self._conn.commit()
48
49 1
    def save_node_value(self, node_id, datavalue):
50 1
        with self._lock:
51 1
            _c_sub = self._conn.cursor()
52
53 1
            table = self._get_table_name(node_id)
54
55
            # insert the data change into the database
56 1
            try:
57 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
58
                               (
59
                                   datavalue.ServerTimestamp,
60
                                   datavalue.SourceTimestamp,
61
                                   datavalue.StatusCode.value,
62
                                   str(datavalue.Value.Value),
63
                                   datavalue.Value.VariantType.name,
64
                                   datavalue.Value.to_binary()
65
                               )
66
                               )
67
            except sqlite3.Error as e:
68
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
69
70 1
            self._conn.commit()
71
72
            # get this node's period from the period dict and calculate the limit
73 1
            period, count = self._datachanges_period[node_id]
74
75 1
            if period:
76
                # after the insert, if a period was specified delete all records older than period
77
                date_limit = datetime.now() - period
78
79
                try:
80
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
81
                                   (date_limit.isoformat(' '),))
82
                except sqlite3.Error as e:
83
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
84
85
                self._conn.commit()
86
87 1
    def read_node_history(self, node_id, start, end, nb_values):
88 1
        with self._lock:
89 1
            _c_read = self._conn.cursor()
90
91 1
            order = "ASC"
92
93 1
            if start is None or start == ua.DateTimeMinValue:
94 1
                order = "DESC"
95 1
                start = ua.DateTimeMinValue
96
97 1
            if end is None or end == ua.DateTimeMinValue:
98 1
                end = datetime.utcnow() + timedelta(days=1)
99
100 1
            if start < end:
101 1
                start_time = start.isoformat(' ')
102 1
                end_time = end.isoformat(' ')
103
            else:
104 1
                order = "DESC"
105 1
                start_time = end.isoformat(' ')
106 1
                end_time = start.isoformat(' ')
107
108 1
            if nb_values:
109 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
110
            else:
111 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
112
113 1
            table = self._get_table_name(node_id)
114
115 1
            cont = None
116 1
            results = []
117
118
            # select values from the database; recreate UA Variant from binary
119 1
            try:
120 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
121
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
122 1
123 1
                    # rebuild the data value object
124 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
125 1
                    dv.ServerTimestamp = row[1]
126
                    dv.SourceTimestamp = row[2]
127 1
                    dv.StatusCode = ua.StatusCode(row[3])
128
129
                    results.append(dv)
130
131
            except sqlite3.Error as e:
132 1
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
133 1
134 1
            if nb_values:
135
                if len(results) > nb_values:
136 1
                    cont = results[nb_values].ServerTimestamp
137
138 1
                results = results[:nb_values]
139
140 1
            return results, cont
141
142 View Code Duplication
    def new_historized_event(self, source_id, ev_fields, period):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143 1
        with self._lock:
144
            _c_new = self._conn.cursor()
145
146 1
            self._datachanges_period[source_id] = period
147
            self._event_fields[source_id] = ev_fields
148
149 1
            table = self._get_table_name(source_id)
150 1
            columns = self._get_event_columns(ev_fields)
151
152 1
            # create a table for the event which will store fields generated by the source object's events
153 1
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
154 1
            # properties with these names
155
            try:
156
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
157
                               '_Timestamp TIMESTAMP, '
158
                               '_EventTypeName TEXT, '
159
                               '{co})'.format(tn=table, co=columns))
160
161
            except sqlite3.Error as e:
162
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
163
164
            self._conn.commit()
165
166
    def save_event(self, event):
167
        with self._lock:
168
            _c_sub = self._conn.cursor()
169
170
            table = self._get_table_name(event.SourceNode)
171
            columns, placeholders, evtup = self._format_event(event)
172
            event_type = event.EventType  # useful for troubleshooting database
173
174
            # insert the event into the database
175
            try:
176
                _c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
177
                               'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
178
179
            except sqlite3.Error as e:
180
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
181
182
            self._conn.commit()
183
184
            # get this node's period from the period dict and calculate the limit
185
            period = self._datachanges_period[event.SourceNode]
186
187
            if period:
188
                # after the insert, if a period was specified delete all records older than period
189
                date_limit = datetime.now() - period
190
191
                try:
192
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
193
                                   (date_limit.isoformat(' '),))
194
                except sqlite3.Error as e:
195
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
196
197
                self._conn.commit()
198
199
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
200
        with self._lock:
201
202
            _c_read = self._conn.cursor()
203
204
            order = "ASC"
205
206
            if start is None or start == ua.DateTimeMinValue:
207
                order = "DESC"
208
                start = ua.DateTimeMinValue
209
210
            if end is None or end == ua.DateTimeMinValue:
211
                end = datetime.utcnow() + timedelta(days=1)
212
213
            if start < end:
214
                start_time = start.isoformat(' ')
215
                end_time = end.isoformat(' ')
216
            else:
217
                order = "DESC"
218
                start_time = end.isoformat(' ')
219
                end_time = start.isoformat(' ')
220
221
            if nb_values:
222
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
223
            else:
224
                limit = -1  # in SQLite a LIMIT of -1 returns all results
225
226
            table = self._get_table_name(source_id)
227
            clauses = self._get_select_clauses(source_id, evfilter)
228
229
            cont = None
230
            cont_timestamps = []
231
            results = []
232
233
            # select events from the database; SQL select clause is built from EventFilter and available fields
234
            try:
235
                for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
236
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
237
                                           (start_time, end_time, limit,)):
238
239
                    # place all the variants in the event field list object
240
                    hist_ev_field_list = ua.HistoryEventFieldList()
241
                    i = 0
242
                    for field in row:
243
                        # if the field is the _Timestamp column store it in a list used for getting the continuation
244
                        if i == 0:
245
                            cont_timestamps.append(field)
246
                        else:
247
                            if field is not None:
248
                                hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
249
                            else:
250
                                hist_ev_field_list.EventFields.append(ua.Variant(None))
251
                        i += 1
252
253
                    results.append(hist_ev_field_list)
254
255
            except sqlite3.Error as e:
256
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
257
258
            if nb_values:
259
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
260
                    cont = cont_timestamps[nb_values]
261
262
                results = results[:nb_values]
263
264
            return results, cont
265
266
    def _get_table_name(self, node_id):
267
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
268
269
    def _format_event(self, event_result):
270
        placeholders = []
271
        ev_fields = []
272
        ev_variant_binaries = []
273
274
        ev_variant_dict = event_result.get_event_props_as_field_dict()
275
276
        # split dict into two synchronized lists which will be converted to SQL strings
277
        # note that the variants are converted to binary objects for storing in SQL BLOB format
278
        for field, variant in ev_variant_dict.items():
279
            placeholders.append('?')
280
            ev_fields.append(field)
281
            ev_variant_binaries.append(variant.to_binary())
282
283
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
284
285
    def _get_event_columns(self, ev_fields):
286
        fields = []
287
        for field in ev_fields:
288
                fields.append(field + ' BLOB')
289
        return self._list_to_sql_str(fields, False)
290
291
    def _get_select_clauses(self, source_id, evfilter):
292
        s_clauses = []
293
        for select_clause in evfilter.SelectClauses:
294
            try:
295
                if not select_clause.BrowsePath:
296
                    s_clauses.append(select_clause.Attribute.name)
297
                else:
298
                    name = select_clause.BrowsePath[0].Name
299
                    s_clauses.append(name)
300
            except AttributeError:
301
                self.logger.error('Historizing SQL OPC UA Select Clauses Error for node %s', source_id)
302
303
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
304
        clauses = [x for x in s_clauses if self._check(source_id, x)]
305
306
        return self._list_to_sql_str(clauses)
307
308
    def _check(self, source_id, s_clause):
309
        if s_clause in self._event_fields[source_id]:
310
            return True
311
        else:
312
            return False
313
314
    def _list_to_sql_str(self, ls, quotes=True):
315
        sql_str = ''
316
        for item in ls:
317
            if quotes:
318
                sql_str += '"' + item + '", '
319
            else:
320
                sql_str += item + ', '
321
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
322
323
    def stop(self):
324
        with self._lock:
325
            self._conn.close()
326
            self.logger.info('Historizing SQL connection closed')
327