Completed
Pull Request — master (#180)
by
unknown
04:14
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 12

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 12.0832

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 12
c 3
b 0
f 0
dl 0
loc 54
ccs 33
cts 36
cp 0.9167
crap 12.0832
rs 3.5428

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._events = {}
20 1
        self._db_file = path
21 1
        self._lock = Lock()
22
        self._event_fields = {}
23 1
24
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
25 1
26 1 View Code Duplication
    def new_historized_node(self, node_id, period, count=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
27 1
        with self._lock:
28
            _c_new = self._conn.cursor()
29 1
30
            table = self._get_table_name(node_id)
31 1
32
            self._datachanges_period[node_id] = period, count
33
34
            # create a table for the node which will store attributes of the DataValue object
35 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
36 1
            try:
37
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
38
                               ' ServerTimestamp TIMESTAMP,'
39
                               ' SourceTimestamp TIMESTAMP,'
40
                               ' StatusCode INTEGER,'
41
                               ' Value TEXT,'
42
                               ' VariantType TEXT,'
43
                               ' VariantBinary BLOB)'.format(tn=table))
44
45
            except sqlite3.Error as e:
46
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
47 1
48
            self._conn.commit()
49 1
50 1
    def save_node_value(self, node_id, datavalue):
51 1
        with self._lock:
52
            _c_sub = self._conn.cursor()
53 1
54
            table = self._get_table_name(node_id)
55
56 1
            # insert the data change into the database
57 1
            try:
58
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
59
                               (
60
                                   datavalue.ServerTimestamp,
61
                                   datavalue.SourceTimestamp,
62
                                   datavalue.StatusCode.value,
63
                                   str(datavalue.Value.Value),
64
                                   datavalue.Value.VariantType.name,
65
                                   datavalue.Value.to_binary()
66
                               )
67
                               )
68
            except sqlite3.Error as e:
69
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
70 1
71
            self._conn.commit()
72
73 1
            # get this node's period from the period dict and calculate the limit
74
            period, count = self._datachanges_period[node_id]
75 1
76
            if period:
77
                # after the insert, if a period was specified delete all records older than period
78
                date_limit = datetime.now() - period
79
80
                try:
81
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
82
                                   (date_limit.isoformat(' '),))
83
                except sqlite3.Error as e:
84
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
85
86
                self._conn.commit()
87 1
88 1
    def read_node_history(self, node_id, start, end, nb_values):
89 1
        with self._lock:
90
            _c_read = self._conn.cursor()
91 1
92
            order = "ASC"
93 1
94 1
            if start is None or start == ua.DateTimeMinValue:
95 1
                order = "DESC"
96
                start = ua.DateTimeMinValue
97 1
98 1
            if end is None or end == ua.DateTimeMinValue:
99
                end = datetime.utcnow() + timedelta(days=1)
100 1
101 1
            if start < end:
102 1
                start_time = start.isoformat(' ')
103
                end_time = end.isoformat(' ')
104 1
            else:
105 1
                order = "DESC"
106 1
                start_time = end.isoformat(' ')
107
                end_time = start.isoformat(' ')
108 1
109 1
            if nb_values:
110
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
111 1
            else:
112
                limit = -1  # in SQLite a LIMIT of -1 returns all results
113 1
114
            table = self._get_table_name(node_id)
115 1
116 1
            cont = None
117
            results = []
118
119 1
            # select values from the database; recreate UA Variant from binary
120 1
            try:
121
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
122 1
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
123 1
124 1
                    # rebuild the data value object
125 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
126
                    dv.ServerTimestamp = row[1]
127 1
                    dv.SourceTimestamp = row[2]
128
                    dv.StatusCode = ua.StatusCode(row[3])
129
130
                    results.append(dv)
131
132 1
            except sqlite3.Error as e:
133 1
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
134 1
135
            if nb_values:
136 1
                if len(results) > nb_values:
137
                    cont = results[nb_values].ServerTimestamp
138 1
139
                results = results[:nb_values]
140 1
141
            return results, cont
142
143 1 View Code Duplication
    def new_historized_event(self, source_id, ev_fields, period):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
144
        with self._lock:
145
            _c_new = self._conn.cursor()
146 1
147
            self._datachanges_period[source_id] = period
148
            self._event_fields[source_id] = ev_fields
149 1
150 1
            table = self._get_table_name(source_id)
151
            columns = self._get_event_columns(ev_fields)
152 1
153 1
            # create a table for the event which will store fields generated by the source object's events
154 1
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
155
            # properties with these names
156
            try:
157
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
158
                               '_Timestamp TIMESTAMP, '
159
                               '_EventTypeName TEXT, '
160
                               '{co})'.format(tn=table, co=columns))
161
162
            except sqlite3.Error as e:
163
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
164
165
            self._conn.commit()
166
167
    def save_event(self, event):
168
        with self._lock:
169
            _c_sub = self._conn.cursor()
170
171
            table = self._get_table_name(event.SourceNode)
172
            columns, placeholders, evtup = self._format_event(event)
173
            event_type = event.EventType  # useful for troubleshooting database
174
175
            # insert the event into the database
176
            try:
177
                _c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
178
                               'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
179
180
            except sqlite3.Error as e:
181
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
182
183
            self._conn.commit()
184
185
            # get this node's period from the period dict and calculate the limit
186
            period = self._datachanges_period[event.SourceNode]
187
188
            if period:
189
                # after the insert, if a period was specified delete all records older than period
190
                date_limit = datetime.now() - period
191
192
                try:
193
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
194
                                   (date_limit.isoformat(' '),))
195
                except sqlite3.Error as e:
196
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
197
198
                self._conn.commit()
199
200
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
201
        with self._lock:
202
203
            _c_read = self._conn.cursor()
204
205
            order = "ASC"
206
207
            if start is None or start == ua.DateTimeMinValue:
208
                order = "DESC"
209
                start = ua.DateTimeMinValue
210
211
            if end is None or end == ua.DateTimeMinValue:
212
                end = datetime.utcnow() + timedelta(days=1)
213
214
            if start < end:
215
                start_time = start.isoformat(' ')
216
                end_time = end.isoformat(' ')
217
            else:
218
                order = "DESC"
219
                start_time = end.isoformat(' ')
220
                end_time = start.isoformat(' ')
221
222
            if nb_values:
223
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
224
            else:
225
                limit = -1  # in SQLite a LIMIT of -1 returns all results
226
227
            table = self._get_table_name(source_id)
228
            clauses = self._get_select_clauses(source_id, evfilter)
229
230
            cont = None
231
            cont_timestamps = []
232
            results = []
233
234
            # select events from the database; SQL select clause is built from EventFilter and available fields
235
            try:
236
                for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
237
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
238
                                           (start_time, end_time, limit,)):
239
240
                    # place all the variants in the event field list object
241
                    hist_ev_field_list = ua.HistoryEventFieldList()
242
                    i = 0
243
                    for field in row:
244
                        # if the field is the _Timestamp column store it in a list used for getting the continuation
245
                        if i == 0:
246
                            cont_timestamps.append(field)
247
                        else:
248
                            if field is not None:
249
                                hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
250
                            else:
251
                                hist_ev_field_list.EventFields.append(ua.Variant(None))
252
                        i += 1
253
254
                    results.append(hist_ev_field_list)
255
256
            except sqlite3.Error as e:
257
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
258
259
            if nb_values:
260
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
261
                    cont = cont_timestamps[nb_values]
262
263
                results = results[:nb_values]
264
265
            return results, cont
266
267
    def _get_table_name(self, node_id):
268
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
269
270
    def _format_event(self, event_result):
271
        placeholders = []
272
        ev_fields = []
273
        ev_variant_binaries = []
274
275
        ev_variant_dict = event_result.get_field_variants()
276
277
        # split dict into two synchronized lists which will be converted to SQL strings
278
        # note that the variants are converted to binary objects for storing in SQL BLOB format
279
        for field, variant in ev_variant_dict.items():
280
            placeholders.append('?')
281
            ev_fields.append(field)
282
            ev_variant_binaries.append(variant.to_binary())
283
284
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
285
286
    def _get_event_columns(self, ev_fields):
287
        fields = []
288
        for field in ev_fields:
289
                fields.append(field + ' BLOB')
290
        return self._list_to_sql_str(fields, False)
291
292
    def _get_select_clauses(self, source_id, evfilter):
293
        s_clauses = []
294
        for select_clause in evfilter.SelectClauses:
295
            try:
296
                if not select_clause.BrowsePath:
297
                    s_clauses.append(select_clause.Attribute.name)
298
                else:
299
                    name = select_clause.BrowsePath[0].Name
300
                    s_clauses.append(name)
301
            except AttributeError:
302
                pass
303
                # FIXME what to do here?
304
305
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
306
        clauses = [x for x in s_clauses if self._check(source_id, x)]
307
308
        return self._list_to_sql_str(clauses)
309
310
    def _check(self, source_id, s_clause):
311
        if s_clause in self._event_fields[source_id]:
312
            return True
313
        else:
314
            return False
315
316
    def _list_to_sql_str(self, ls, quotes=True):
317
        sql_str = ''
318
        for item in ls:
319
            if quotes:
320
                sql_str += '"' + item + '", '
321
            else:
322
                sql_str += item + ', '
323
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
324
325
    def stop(self):
326
        with self._lock:
327
            self._conn.close()
328
            self.logger.info('Historizing SQL connection closed')
329