Completed
Pull Request — master (#171)
by
unknown
06:17
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 13

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 13.0976

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 13
c 3
b 0
f 0
dl 0
loc 54
ccs 33
cts 36
cp 0.9167
crap 13.0976
rs 3.5512

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._events = {}
20 1
        self._db_file = path
21 1
        self._lock = Lock()
22
        self._event_attributes = {}
23 1
24
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
25 1
26 1 View Code Duplication
    def new_historized_node(self, node_id, period, count=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
27 1
        with self._lock:
28
            _c_new = self._conn.cursor()
29 1
30
            table = self._get_table_name(node_id)
31 1
32
            self._datachanges_period[node_id] = period, count
33
34
            # create a table for the node which will store attributes of the DataValue object
35 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
36 1
            try:
37
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
38
                               ' ServerTimestamp TIMESTAMP,'
39
                               ' SourceTimestamp TIMESTAMP,'
40
                               ' StatusCode INTEGER,'
41
                               ' Value TEXT,'
42
                               ' VariantType TEXT,'
43
                               ' VariantBinary BLOB)'.format(tn=table))
44
45
            except sqlite3.Error as e:
46
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
47 1
48
            self._conn.commit()
49 1
50 1
    def save_node_value(self, node_id, datavalue):
51 1
        with self._lock:
52
            _c_sub = self._conn.cursor()
53 1
54
            table = self._get_table_name(node_id)
55
56 1
            # insert the data change into the database
57 1
            try:
58
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
59
                               (
60
                                   datavalue.ServerTimestamp,
61
                                   datavalue.SourceTimestamp,
62
                                   datavalue.StatusCode.value,
63
                                   str(datavalue.Value.Value),
64
                                   datavalue.Value.VariantType.name,
65
                                   datavalue.Value.to_binary()
66
                               )
67
                               )
68
            except sqlite3.Error as e:
69
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
70 1
71
            self._conn.commit()
72
73 1
            # get this node's period from the period dict and calculate the limit
74
            period, count = self._datachanges_period[node_id]
75 1
76
            if period:
77
                # after the insert, if a period was specified delete all records older than period
78
                date_limit = datetime.now() - period
79
80
                try:
81
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
82
                                   (date_limit.isoformat(' '),))
83
                except sqlite3.Error as e:
84
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
85
86
                self._conn.commit()
87 1
88 1
    def read_node_history(self, node_id, start, end, nb_values):
89 1
        with self._lock:
90
            _c_read = self._conn.cursor()
91 1
92
            order = "ASC"
93 1
94 1
            if start is None or start == ua.DateTimeMinValue:
95 1
                order = "DESC"
96
                start = ua.DateTimeMinValue
97 1
98 1
            if end is None or end == ua.DateTimeMinValue:
99
                end = datetime.utcnow() + timedelta(days=1)
100 1
101 1
            if start < end:
102 1
                start_time = start.isoformat(' ')
103
                end_time = end.isoformat(' ')
104 1
            else:
105 1
                order = "DESC"
106 1
                start_time = end.isoformat(' ')
107
                end_time = start.isoformat(' ')
108 1
109 1
            if nb_values:
110
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
111 1
            else:
112
                limit = -1  # in SQLite a LIMIT of -1 returns all results
113 1
114
            table = self._get_table_name(node_id)
115 1
116 1
            cont = None
117
            results = []
118
119 1
            # select values from the database; recreate UA Variant from binary
120 1
            try:
121
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
122 1
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
123 1
124 1
                    # rebuild the data value object
125 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
126
                    dv.ServerTimestamp = row[1]
127 1
                    dv.SourceTimestamp = row[2]
128
                    dv.StatusCode = ua.StatusCode(row[3])
129
130
                    results.append(dv)
131
132 1
            except sqlite3.Error as e:
133 1
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
134 1
135
            if nb_values:
136 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
137
                    cont = results[nb_values].ServerTimestamp
138 1
139
                results = results[:nb_values]
140 1
141
            return results, cont
142
143 1 View Code Duplication
    def new_historized_event(self, source_id, etype, period):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
144
        with self._lock:
145
            _c_new = self._conn.cursor()
146 1
147
            table = self._get_table_name(source_id)
148
149 1
            # FIXME need to call vars(etype) to get all attributes as strings so proper table columns can be created
150 1
            # FIXME should a table already exist, but a new etype is supplied, table needs to be ALTERED
151
152 1
            self._datachanges_period[source_id] = period
153 1
154 1
            self._event_attributes[source_id] = etype.__dict__.keys()  # get fields (find a better way?)
155
156
            # create a table for the event which will store attributes of the Event object
157
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
158
            try:
159
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
160
                               ' Timestamp TIMESTAMP,'
161
                               ' Time BLOB,'
162
                               ' ReceiveTime BLOB,'
163
                               ' LocalTime BLOB,'
164
                               ' EventId BLOB,'
165
                               ' EventType BLOB,'
166
                               ' Severity BLOB,'
167
                               ' Message BLOB,'
168
                               ' SourceName BLOB,'
169
                               ' SourceNode BLOB,'
170
                               ' ServerHandle BLOB)'.format(tn=table))
171
172
            except sqlite3.Error as e:
173
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
174
175
            self._conn.commit()
176
177
    def save_event(self, event):
178
        with self._lock:
179
            _c_sub = self._conn.cursor()
180
181
            table = self._get_table_name(event.SourceNode)
182
183
            placeholders, evtup = self._format_event(event)
184
185
            # insert the event into the database
186
            # print('INSERT INTO "{tn}" VALUES (NULL, "{ts}", '.format(tn=table, ts=event.Time) + placeholders + ')')
187
            try:
188
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, "{ts}", {pl})'.format(tn=table, ts=event.Time, pl=placeholders), evtup)
189
            except sqlite3.Error as e:
190
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
191
192
            self._conn.commit()
193
194
            # get this node's period from the period dict and calculate the limit
195
            period = self._datachanges_period[event.SourceNode]
196
197
            if period:
198
                # after the insert, if a period was specified delete all records older than period
199
                date_limit = datetime.now() - period
200
201
                try:
202
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
203
                                   (date_limit.isoformat(' '),))
204
                except sqlite3.Error as e:
205
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
206
207
                self._conn.commit()
208
209
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
210
        with self._lock:
211
212
            _c_read = self._conn.cursor()
213
214
            order = "ASC"
215
216
            if start is None or start == ua.DateTimeMinValue:
217
                order = "DESC"
218
                start = ua.DateTimeMinValue
219
220
            if end is None or end == ua.DateTimeMinValue:
221
                end = datetime.utcnow() + timedelta(days=1)
222
223
            if start < end:
224
                start_time = start.isoformat(' ')
225
                end_time = end.isoformat(' ')
226
            else:
227
                order = "DESC"
228
                start_time = end.isoformat(' ')
229
                end_time = start.isoformat(' ')
230
231
            if nb_values:
232
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
233
            else:
234
                limit = -1  # in SQLite a LIMIT of -1 returns all results
235
236
            table = self._get_table_name(source_id)
237
            clauses = self._get_select_clauses(source_id, evfilter)
238
239
            cont = None
240
            results = []
241
242
            # print('SELECT {cl} FROM "{tn}" WHERE "Time" BETWEEN ? AND ? ORDER BY "Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order))
243
            # test "EventId", "EventType", "SourceName", "Time", "Message", "Severity"
244
245
            # select events from the database; SQL select clause is built from EventFilter and available fields
246
            try:
247
                for row in _c_read.execute('SELECT {cl} FROM "{tn}" WHERE "Timestamp" BETWEEN ? AND ? '
248
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order), (start_time, end_time, limit,)):
249
250
                    # place all the variants in the event field list object
251
                    hist_ev_field_list = ua.HistoryEventFieldList()
252
                    for field in row:
253
                        hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
254
255
                    results.append(hist_ev_field_list)
256
257
            except sqlite3.Error as e:
258
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
259
260
            if nb_values:
261
                if start > ua.DateTimeMinValue and len(results) > nb_values:
262
                    cont = results[nb_values].Time
263
264
                results = results[:nb_values]
265
266
            return results, cont
267
268
    def _get_table_name(self, node_id):
269
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
270
271
    def _format_event(self, event_result):
272
        placeholder = ''
273
        ev_variant_binaries = []
274
275
        ev_variants = event_result.get_fields()
276
277
        # convert the variants in each field to binary for storing in SQL BLOBs
278
        for variant in ev_variants:
279
            placeholder += '?, '
280
            ev_variant_binaries.append(variant.to_binary())
281
282
        placeholder = placeholder[:-2]  # remove trailing space and comma for SQL syntax
283
        evtup = tuple(ev_variant_binaries)
284
285
        return placeholder, evtup
286
287
    def _get_event_type_fields(self, etype):
288
        # FIXME finish and test
289
        etype_vars = vars(etype)
290
        etype_attr = etype_vars.keys()
291
292
    def _get_select_clauses(self, source_id, evfilter):
293
        s_clauses = []
294
        for select_clause in evfilter.SelectClauses:
295
            try:
296
                if not select_clause.BrowsePath:
297
                    s_clauses.append(select_clause.Attribute.name)
298
                else:
299
                    name = select_clause.BrowsePath[0].Name
300
                    s_clauses.append(name)
301
            except AttributeError:
302
                pass
303
                # FIXME what to do here?
304
305
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
306
        clauses = [x for x in s_clauses if self._check(source_id, x)]
307
        clause_str = ''
308
309
        for select_clause in clauses:
310
            clause_str += '"' + select_clause + '", '
311
312
        return clause_str[:-2]  # remove trailing space and comma for SQL syntax
313
314
    def _check(self, source_id, s_clause):
315
        if s_clause in self._event_attributes[source_id]:
316
            return True
317
        else:
318
            return False
319
320
    def stop(self):
321
        with self._lock:
322
            self._conn.close()
323
            self.logger.info('Historizing SQL connection closed')
324