Completed
Pull Request — master (#171)
by
unknown
06:22
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 13

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 13.0976

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 13
c 3
b 0
f 0
dl 0
loc 54
ccs 33
cts 36
cp 0.9167
crap 13.0976
rs 3.5512

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._events = {}
20 1
        self._db_file = path
21 1
        self._lock = Lock()
22
        self._event_attributes = {}
23 1
24
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
25 1
26 1
    def new_historized_node(self, node_id, period, count=0):
27 1
        with self._lock:
28
            _c_new = self._conn.cursor()
29 1
30
            table = self._get_table_name(node_id)
31 1
32
            self._datachanges_period[node_id] = period, count
33
34
            # create a table for the node which will store attributes of the DataValue object
35 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
36 1
            try:
37
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
38
                               ' ServerTimestamp TIMESTAMP,'
39
                               ' SourceTimestamp TIMESTAMP,'
40
                               ' StatusCode INTEGER,'
41
                               ' Value TEXT,'
42
                               ' VariantType TEXT,'
43
                               ' VariantBinary BLOB)'.format(tn=table))
44
45
            except sqlite3.Error as e:
46
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
47 1
48
            self._conn.commit()
49 1
50 1
    def save_node_value(self, node_id, datavalue):
51 1
        with self._lock:
52
            _c_sub = self._conn.cursor()
53 1
54
            table = self._get_table_name(node_id)
55
56 1
            # insert the data change into the database
57 1
            try:
58
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
59
                               (
60
                                   datavalue.ServerTimestamp,
61
                                   datavalue.SourceTimestamp,
62
                                   datavalue.StatusCode.value,
63
                                   str(datavalue.Value.Value),
64
                                   datavalue.Value.VariantType.name,
65
                                   datavalue.Value.to_binary()
66
                               )
67
                               )
68
            except sqlite3.Error as e:
69
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
70 1
71
            self._conn.commit()
72
73 1
            # get this node's period from the period dict and calculate the limit
74
            period, count = self._datachanges_period[node_id]
75 1
76
            if period:
77
                # after the insert, if a period was specified delete all records older than period
78
                date_limit = datetime.now() - period
79
80
                try:
81
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
82
                                   (date_limit.isoformat(' '),))
83
                except sqlite3.Error as e:
84
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
85
86
                self._conn.commit()
87 1
88 1
    def read_node_history(self, node_id, start, end, nb_values):
89 1
        with self._lock:
90
            _c_read = self._conn.cursor()
91 1
92
            order = "ASC"
93 1
94 1
            if start is None or start == ua.DateTimeMinValue:
95 1
                order = "DESC"
96
                start = ua.DateTimeMinValue
97 1
98 1
            if end is None or end == ua.DateTimeMinValue:
99
                end = datetime.utcnow() + timedelta(days=1)
100 1
101 1
            if start < end:
102 1
                start_time = start.isoformat(' ')
103
                end_time = end.isoformat(' ')
104 1
            else:
105 1
                order = "DESC"
106 1
                start_time = end.isoformat(' ')
107
                end_time = start.isoformat(' ')
108 1
109 1
            if nb_values:
110
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
111 1
            else:
112
                limit = -1  # in SQLite a LIMIT of -1 returns all results
113 1
114
            table = self._get_table_name(node_id)
115 1
116 1
            cont = None
117
            results = []
118
119 1
            # select values from the database; recreate UA Variant from binary
120 1
            try:
121
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
122 1
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
123 1
124 1
                    # rebuild the data value object
125 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
126
                    dv.ServerTimestamp = row[1]
127 1
                    dv.SourceTimestamp = row[2]
128
                    dv.StatusCode = ua.StatusCode(row[3])
129
130
                    results.append(dv)
131
132 1
            except sqlite3.Error as e:
133 1
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
134 1
135
            if nb_values:
136 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
137
                    cont = results[nb_values].ServerTimestamp
138 1
139
                results = results[:nb_values]
140 1
141
            return results, cont
142
143 1
    def new_historized_event(self, source_id, etype, period):
144
        with self._lock:
145
            _c_new = self._conn.cursor()
146 1
147
            table = self._get_table_name(source_id)
148
149 1
            # FIXME need to call vars(etype) to get all attributes as strings so proper table columns can be created
150 1
            # FIXME should a table already exist, but a new etype is supplied, table needs to be ALTERED
151
152 1
            self._datachanges_period[source_id] = period
153 1
154 1
            self._event_attributes[source_id] = etype.__dict__.keys()  # get fields (find a better way?)
155
156
            fields = self._get_event_fields(etype)
157
158
            # create a table for the event which will store attributes of the Event object
159
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
160
            try:
161
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL, '
162
                               'Timestamp TIMESTAMP, '
163
                               '{fd})'.format(tn=table, fd=fields))
164
165
            except sqlite3.Error as e:
166
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
167
168
            self._conn.commit()
169
170
    def save_event(self, event):
171
        with self._lock:
172
            _c_sub = self._conn.cursor()
173
174
            table = self._get_table_name(event.SourceNode)
175
176
            fields, placeholders, evtup = self._format_event(event)
177
178
            # insert the event into the database
179
            # print('INSERT INTO "{tn}" VALUES (NULL, "{ts}", '.format(tn=table, ts=event.Time) + placeholders + ')')
180
            try:
181
                _c_sub.execute('INSERT INTO "{tn}" ("Id", "Timestamp", {fd}) VALUES (NULL, "{ts}", {pl})'.format(tn=table, fd=fields, ts=event.Time, pl=placeholders), evtup)
182
            except sqlite3.Error as e:
183
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
184
185
            self._conn.commit()
186
187
            # get this node's period from the period dict and calculate the limit
188
            period = self._datachanges_period[event.SourceNode]
189
190
            if period:
191
                # after the insert, if a period was specified delete all records older than period
192
                date_limit = datetime.now() - period
193
194
                try:
195
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
196
                                   (date_limit.isoformat(' '),))
197
                except sqlite3.Error as e:
198
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
199
200
                self._conn.commit()
201
202
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
203
        with self._lock:
204
205
            _c_read = self._conn.cursor()
206
207
            order = "ASC"
208
209
            if start is None or start == ua.DateTimeMinValue:
210
                order = "DESC"
211
                start = ua.DateTimeMinValue
212
213
            if end is None or end == ua.DateTimeMinValue:
214
                end = datetime.utcnow() + timedelta(days=1)
215
216
            if start < end:
217
                start_time = start.isoformat(' ')
218
                end_time = end.isoformat(' ')
219
            else:
220
                order = "DESC"
221
                start_time = end.isoformat(' ')
222
                end_time = start.isoformat(' ')
223
224
            if nb_values:
225
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
226
            else:
227
                limit = -1  # in SQLite a LIMIT of -1 returns all results
228
229
            table = self._get_table_name(source_id)
230
            clauses = self._get_select_clauses(source_id, evfilter)
231
232
            cont = None
233
            results = []
234
235
            # print('SELECT {cl} FROM "{tn}" WHERE "Time" BETWEEN ? AND ? ORDER BY "Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order))
236
            # test "EventId", "EventType", "SourceName", "Time", "Message", "Severity"
237
238
            # select events from the database; SQL select clause is built from EventFilter and available fields
239
            try:
240
                for row in _c_read.execute('SELECT {cl} FROM "{tn}" WHERE "Timestamp" BETWEEN ? AND ? '
241
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order), (start_time, end_time, limit,)):
242
243
                    # place all the variants in the event field list object
244
                    hist_ev_field_list = ua.HistoryEventFieldList()
245
                    for field in row:
246
                        hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
247
248
                    results.append(hist_ev_field_list)
249
250
            except sqlite3.Error as e:
251
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
252
253
            if nb_values:
254
                if start > ua.DateTimeMinValue and len(results) > nb_values:
255
                    cont = results[nb_values].Time
256
257
                results = results[:nb_values]
258
259
            return results, cont
260
261
    def _get_table_name(self, node_id):
262
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
263
264
    def _format_event(self, event_result):
265
        placeholders = []
266
        ev_fields = []
267
        ev_variant_binaries = []
268
269
        ev_variant_dict = event_result.get_field_variants()
270
271
        # split dict into two synchronized lists which will be converted to SQL strings
272
        # note that the variants are converted to binary objects for storing in BLOB format
273
        for field, variant in ev_variant_dict.items():
274
            placeholders.append('?')
275
            ev_fields.append(field)
276
            ev_variant_binaries.append(variant.to_binary())
277
278
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
279
280
    def _get_event_fields(self, etype):
281
        fields = []
282
        for key, value in vars(etype).items():
283
            if not key.startswith("__") and key is not "_freeze":
284
                fields.append(key + ' BLOB')
285
        return self._list_to_sql_str(fields, False)
286
287
    def _get_select_clauses(self, source_id, evfilter):
288
        s_clauses = []
289
        for select_clause in evfilter.SelectClauses:
290
            try:
291
                if not select_clause.BrowsePath:
292
                    s_clauses.append(select_clause.Attribute.name)
293
                else:
294
                    name = select_clause.BrowsePath[0].Name
295
                    s_clauses.append(name)
296
            except AttributeError:
297
                pass
298
                # FIXME what to do here?
299
300
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
301
        clauses = [x for x in s_clauses if self._check(source_id, x)]
302
303
        return self._list_to_sql_str(clauses)
304
305
    def _check(self, source_id, s_clause):
306
        if s_clause in self._event_attributes[source_id]:
307
            return True
308
        else:
309
            return False
310
311
    def _list_to_sql_str(self, l, quotes=True):
312
        sql_str = ''
313
        for item in l:
314
            if quotes:
315
                sql_str += '"' + item + '", '
316
            else:
317
                sql_str += item + ', '
318
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
319
320
    def stop(self):
321
        with self._lock:
322
            self._conn.close()
323
            self.logger.info('Historizing SQL connection closed')
324