Completed
Pull Request — master (#172)
by
unknown
06:10
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 13

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 13.0976

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 13
c 3
b 0
f 0
dl 0
loc 54
ccs 33
cts 36
cp 0.9167
crap 13.0976
rs 3.5512

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._events = {}
20 1
        self._db_file = path
21 1
        self._lock = Lock()
22
        self._event_attributes = {}
23 1
24
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
25 1
26 1
    def new_historized_node(self, node_id, period, count=0):
27 1
        with self._lock:
28
            _c_new = self._conn.cursor()
29 1
30
            table = self._get_table_name(node_id)
31 1
32
            self._datachanges_period[node_id] = period, count
33
34
            # create a table for the node which will store attributes of the DataValue object
35 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
36 1
            try:
37
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
38
                               ' ServerTimestamp TIMESTAMP,'
39
                               ' SourceTimestamp TIMESTAMP,'
40
                               ' StatusCode INTEGER,'
41
                               ' Value TEXT,'
42
                               ' VariantType TEXT,'
43
                               ' VariantBinary BLOB)'.format(tn=table))
44
45
            except sqlite3.Error as e:
46
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
47 1
48
            self._conn.commit()
49 1
50 1
    def save_node_value(self, node_id, datavalue):
51 1
        with self._lock:
52
            _c_sub = self._conn.cursor()
53 1
54
            table = self._get_table_name(node_id)
55
56 1
            # insert the data change into the database
57 1
            try:
58
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
59
                               (
60
                                   datavalue.ServerTimestamp,
61
                                   datavalue.SourceTimestamp,
62
                                   datavalue.StatusCode.value,
63
                                   str(datavalue.Value.Value),
64
                                   datavalue.Value.VariantType.name,
65
                                   datavalue.Value.to_binary()
66
                               )
67
                               )
68
            except sqlite3.Error as e:
69
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
70 1
71
            self._conn.commit()
72
73 1
            # get this node's period from the period dict and calculate the limit
74
            period, count = self._datachanges_period[node_id]
75 1
76
            if period:
77
                # after the insert, if a period was specified delete all records older than period
78
                date_limit = datetime.now() - period
79
80
                try:
81
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
82
                                   (date_limit.isoformat(' '),))
83
                except sqlite3.Error as e:
84
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
85
86
                self._conn.commit()
87 1
88 1
    def read_node_history(self, node_id, start, end, nb_values):
89 1
        with self._lock:
90
            _c_read = self._conn.cursor()
91 1
92
            order = "ASC"
93 1
94 1
            if start is None or start == ua.DateTimeMinValue:
95 1
                order = "DESC"
96
                start = ua.DateTimeMinValue
97 1
98 1
            if end is None or end == ua.DateTimeMinValue:
99
                end = datetime.utcnow() + timedelta(days=1)
100 1
101 1
            if start < end:
102 1
                start_time = start.isoformat(' ')
103
                end_time = end.isoformat(' ')
104 1
            else:
105 1
                order = "DESC"
106 1
                start_time = end.isoformat(' ')
107
                end_time = start.isoformat(' ')
108 1
109 1
            if nb_values:
110
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
111 1
            else:
112
                limit = -1  # in SQLite a LIMIT of -1 returns all results
113 1
114
            table = self._get_table_name(node_id)
115 1
116 1
            cont = None
117
            results = []
118
119 1
            # select values from the database; recreate UA Variant from binary
120 1
            try:
121
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
122 1
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
123 1
124 1
                    # rebuild the data value object
125 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
126
                    dv.ServerTimestamp = row[1]
127 1
                    dv.SourceTimestamp = row[2]
128
                    dv.StatusCode = ua.StatusCode(row[3])
129
130
                    results.append(dv)
131
132 1
            except sqlite3.Error as e:
133 1
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
134 1
135
            if nb_values:
136 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
137
                    cont = results[nb_values].ServerTimestamp
138 1
139
                results = results[:nb_values]
140 1
141
            return results, cont
142
143 1
    def new_historized_event(self, source_id, etype, period):
144
        with self._lock:
145
            _c_new = self._conn.cursor()
146 1
147
            table = self._get_table_name(source_id)
148
149 1
            # FIXME need a way to add new column for new fields to a table, or build the table from multiple event types
150 1
151
            self._datachanges_period[source_id] = period
152 1
153 1
            self._event_attributes[source_id] = etype.__dict__.keys()  # FIXME: find a better way? use vars()?
154 1
155
            fields = self._get_event_fields(etype)
156
157
            # create a table for the event which will store attributes of the Event object
158
            try:
159
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL, '
160
                               'Timestamp TIMESTAMP, '
161
                               '{fd})'.format(tn=table, fd=fields))
162
163
            except sqlite3.Error as e:
164
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
165
166
            self._conn.commit()
167
168
    def save_event(self, event):
169
        with self._lock:
170
            _c_sub = self._conn.cursor()
171
172
            table = self._get_table_name(event.SourceNode)
173
174
            fields, placeholders, evtup = self._format_event(event)
175
176
            # insert the event into the database
177
            try:
178
                _c_sub.execute('INSERT INTO "{tn}" ("Id", "Timestamp", {fd}) VALUES (NULL, "{ts}", '
179
                               '{pl})'.format(tn=table, fd=fields, ts=event.Time, pl=placeholders), evtup)
180
            except sqlite3.Error as e:
181
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
182
183
            self._conn.commit()
184
185
            # get this node's period from the period dict and calculate the limit
186
            period = self._datachanges_period[event.SourceNode]
187
188
            if period:
189
                # after the insert, if a period was specified delete all records older than period
190
                date_limit = datetime.now() - period
191
192
                try:
193
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
194
                                   (date_limit.isoformat(' '),))
195
                except sqlite3.Error as e:
196
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
197
198
                self._conn.commit()
199
200
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
201
        with self._lock:
202
203
            _c_read = self._conn.cursor()
204
205
            order = "ASC"
206
207
            if start is None or start == ua.DateTimeMinValue:
208
                order = "DESC"
209
                start = ua.DateTimeMinValue
210
211
            if end is None or end == ua.DateTimeMinValue:
212
                end = datetime.utcnow() + timedelta(days=1)
213
214
            if start < end:
215
                start_time = start.isoformat(' ')
216
                end_time = end.isoformat(' ')
217
            else:
218
                order = "DESC"
219
                start_time = end.isoformat(' ')
220
                end_time = start.isoformat(' ')
221
222
            if nb_values:
223
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
224
            else:
225
                limit = -1  # in SQLite a LIMIT of -1 returns all results
226
227
            table = self._get_table_name(source_id)
228
            clauses = self._get_select_clauses(source_id, evfilter)
229
230
            cont = None
231
            results = []
232
233
            # select events from the database; SQL select clause is built from EventFilter and available fields
234
            try:
235
                for row in _c_read.execute('SELECT {cl} FROM "{tn}" WHERE "Timestamp" BETWEEN ? AND ? '
236
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
237
                                           (start_time, end_time, limit,)):
238
239
                    # place all the variants in the event field list object
240
                    hist_ev_field_list = ua.HistoryEventFieldList()
241
                    for field in row:
242
                        hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
243
244
                    results.append(hist_ev_field_list)
245
246
            except sqlite3.Error as e:
247
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
248
249
            if nb_values:
250
                if start > ua.DateTimeMinValue and len(results) > nb_values:
251
                    cont = results[nb_values].Time
252
253
                results = results[:nb_values]
254
255
            return results, cont
256
257
    def _get_table_name(self, node_id):
258
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
259
260
    def _format_event(self, event_result):
261
        placeholders = []
262
        ev_fields = []
263
        ev_variant_binaries = []
264
265
        ev_variant_dict = event_result.get_field_variants()
266
267
        # split dict into two synchronized lists which will be converted to SQL strings
268
        # note that the variants are converted to binary objects for storing in SQL BLOB format
269
        for field, variant in ev_variant_dict.items():
270
            placeholders.append('?')
271
            ev_fields.append(field)
272
            ev_variant_binaries.append(variant.to_binary())
273
274
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
275
276
    def _get_event_fields(self, etype):
277
        fields = []
278
        for key, value in vars(etype).items():
279
            if not key.startswith("__") and key is not "_freeze":
280
                fields.append(key + ' BLOB')
281
        return self._list_to_sql_str(fields, False)
282
283
    def _get_select_clauses(self, source_id, evfilter):
284
        s_clauses = []
285
        for select_clause in evfilter.SelectClauses:
286
            try:
287
                if not select_clause.BrowsePath:
288
                    s_clauses.append(select_clause.Attribute.name)
289
                else:
290
                    name = select_clause.BrowsePath[0].Name
291
                    s_clauses.append(name)
292
            except AttributeError:
293
                pass
294
                # FIXME what to do here?
295
296
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
297
        clauses = [x for x in s_clauses if self._check(source_id, x)]
298
299
        return self._list_to_sql_str(clauses)
300
301
    def _check(self, source_id, s_clause):
302
        if s_clause in self._event_attributes[source_id]:
303
            return True
304
        else:
305
            return False
306
307
    def _list_to_sql_str(self, ls, quotes=True):
308
        sql_str = ''
309
        for item in ls:
310
            if quotes:
311
                sql_str += '"' + item + '", '
312
            else:
313
                sql_str += item + ', '
314
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
315
316
    def stop(self):
317
        with self._lock:
318
            self._conn.close()
319
            self.logger.info('Historizing SQL connection closed')
320