Completed
Pull Request — master (#180)
by
unknown
03:40
created

HistorySQLite.read_event_history()   F

Complexity

Conditions 15

Size

Total Lines 66

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 39
CRAP Score 15.0818

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 15
c 2
b 0
f 0
dl 0
loc 66
ccs 39
cts 42
cp 0.9286
crap 15.0818
rs 2.6873

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_event_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._db_file = path
20 1
        self._lock = Lock()
21 1
        self._event_fields = {}
22
23 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
24
25 1 View Code Duplication
    def new_historized_node(self, node_id, period, count=0):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26 1
        with self._lock:
27 1
            _c_new = self._conn.cursor()
28
29 1
            table = self._get_table_name(node_id)
30
31 1
            self._datachanges_period[node_id] = period, count
32
33
            # create a table for the node which will store attributes of the DataValue object
34
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
35 1
            try:
36 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
37
                               ' ServerTimestamp TIMESTAMP,'
38
                               ' SourceTimestamp TIMESTAMP,'
39
                               ' StatusCode INTEGER,'
40
                               ' Value TEXT,'
41
                               ' VariantType TEXT,'
42
                               ' VariantBinary BLOB)'.format(tn=table))
43
44
            except sqlite3.Error as e:
45
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
46
47 1
            self._conn.commit()
48
49 1
    def save_node_value(self, node_id, datavalue):
50 1
        with self._lock:
51 1
            _c_sub = self._conn.cursor()
52
53 1
            table = self._get_table_name(node_id)
54
55
            # insert the data change into the database
56 1
            try:
57 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
58
                               (
59
                                   datavalue.ServerTimestamp,
60
                                   datavalue.SourceTimestamp,
61
                                   datavalue.StatusCode.value,
62
                                   str(datavalue.Value.Value),
63
                                   datavalue.Value.VariantType.name,
64
                                   datavalue.Value.to_binary()
65
                               )
66
                               )
67
            except sqlite3.Error as e:
68
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
69
70 1
            self._conn.commit()
71
72
            # get this node's period from the period dict and calculate the limit
73 1
            period, count = self._datachanges_period[node_id]
74
75 1
            if period:
76
                # after the insert, if a period was specified delete all records older than period
77
                date_limit = datetime.now() - period
78
79
                try:
80
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
81
                                   (date_limit.isoformat(' '),))
82
                except sqlite3.Error as e:
83
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
84
85
                self._conn.commit()
86
87 1
    def read_node_history(self, node_id, start, end, nb_values):
88 1
        with self._lock:
89 1
            _c_read = self._conn.cursor()
90
91 1
            order = "ASC"
92
93 1
            if start is None or start == ua.DateTimeMinValue:
94 1
                order = "DESC"
95 1
                start = ua.DateTimeMinValue
96
97 1
            if end is None or end == ua.DateTimeMinValue:
98 1
                end = datetime.utcnow() + timedelta(days=1)
99
100 1
            if start < end:
101 1
                start_time = start.isoformat(' ')
102 1
                end_time = end.isoformat(' ')
103
            else:
104 1
                order = "DESC"
105 1
                start_time = end.isoformat(' ')
106 1
                end_time = start.isoformat(' ')
107
108 1
            if nb_values:
109 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
110
            else:
111 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
112
113 1
            table = self._get_table_name(node_id)
114
115 1
            cont = None
116 1
            results = []
117
118
            # select values from the database; recreate UA Variant from binary
119 1
            try:
120 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
121
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
122
123
                    # rebuild the data value object
124 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
125 1
                    dv.ServerTimestamp = row[1]
126 1
                    dv.SourceTimestamp = row[2]
127 1
                    dv.StatusCode = ua.StatusCode(row[3])
128
129 1
                    results.append(dv)
130
131
            except sqlite3.Error as e:
132
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
133
134 1
            if nb_values:
135 1
                if len(results) > nb_values:
136 1
                    cont = results[nb_values].ServerTimestamp
137
138 1
                results = results[:nb_values]
139
140 1
            return results, cont
141
142 1 View Code Duplication
    def new_historized_event(self, source_id, ev_fields, period):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143 1
        with self._lock:
144 1
            _c_new = self._conn.cursor()
145
146 1
            self._datachanges_period[source_id] = period
147 1
            self._event_fields[source_id] = ev_fields
148
149 1
            table = self._get_table_name(source_id)
150 1
            columns = self._get_event_columns(ev_fields)
151
152
            # create a table for the event which will store fields generated by the source object's events
153
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
154
            # properties with these names
155 1
            try:
156 1
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
157
                               '_Timestamp TIMESTAMP, '
158
                               '_EventTypeName TEXT, '
159
                               '{co})'.format(tn=table, co=columns))
160
161
            except sqlite3.Error as e:
162
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
163
164 1
            self._conn.commit()
165
166 1
    def save_event(self, event):
167 1
        with self._lock:
168 1
            _c_sub = self._conn.cursor()
169
170 1
            table = self._get_table_name(event.SourceNode)
171 1
            columns, placeholders, evtup = self._format_event(event)
172 1
            event_type = event.EventType  # useful for troubleshooting database
173
174
            # insert the event into the database
175 1
            try:
176 1
                _c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
177
                               'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
178
179
            except sqlite3.Error as e:
180
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
181
182 1
            self._conn.commit()
183
184
            # get this node's period from the period dict and calculate the limit
185 1
            period = self._datachanges_period[event.SourceNode]
186
187 1
            if period:
188
                # after the insert, if a period was specified delete all records older than period
189
                date_limit = datetime.now() - period
190
191
                try:
192
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
193
                                   (date_limit.isoformat(' '),))
194
                except sqlite3.Error as e:
195
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s', event.SourceNode, e)
196
197
                self._conn.commit()
198
199 1
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
200 1
        with self._lock:
201
202 1
            _c_read = self._conn.cursor()
203
204 1
            order = "ASC"
205
206 1
            if start is None or start == ua.DateTimeMinValue:
207 1
                order = "DESC"
208 1
                start = ua.DateTimeMinValue
209
210 1
            if end is None or end == ua.DateTimeMinValue:
211 1
                end = datetime.utcnow() + timedelta(days=1)
212
213 1
            if start < end:
214 1
                start_time = start.isoformat(' ')
215 1
                end_time = end.isoformat(' ')
216
            else:
217 1
                order = "DESC"
218 1
                start_time = end.isoformat(' ')
219 1
                end_time = start.isoformat(' ')
220
221 1
            if nb_values:
222 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
223
            else:
224 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
225
226 1
            table = self._get_table_name(source_id)
227 1
            clauses = self._get_select_clauses(source_id, evfilter)
228
229 1
            cont = None
230 1
            cont_timestamps = []
231 1
            results = []
232
233
            # select events from the database; SQL select clause is built from EventFilter and available fields
234 1
            try:
235 1
                for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
236
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
237
                                           (start_time, end_time, limit,)):
238
239
                    # place all the variants in the event field list object
240 1
                    hist_ev_field_list = ua.HistoryEventFieldList()
241 1
                    i = 0
242 1
                    for field in row:
243
                        # if the field is the _Timestamp column store it in a list used for getting the continuation
244 1
                        if i == 0:
245 1
                            cont_timestamps.append(field)
246
                        else:
247 1
                            if field is not None:
248 1
                                hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
249
                            else:
250
                                hist_ev_field_list.EventFields.append(ua.Variant(None))
251 1
                        i += 1
252
253 1
                    results.append(hist_ev_field_list)
254
255
            except sqlite3.Error as e:
256
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
257
258 1
            if nb_values:
259 1
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
260 1
                    cont = cont_timestamps[nb_values]
261
262 1
                results = results[:nb_values]
263
264 1
            return results, cont
265
266 1
    def _get_table_name(self, node_id):
267 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
268
269 1
    def _format_event(self, event_result):
270 1
        placeholders = []
271 1
        ev_fields = []
272 1
        ev_variant_binaries = []
273
274 1
        ev_variant_dict = event_result.get_event_props_as_fields_dict()
275
276
        # split dict into two synchronized lists which will be converted to SQL strings
277
        # note that the variants are converted to binary objects for storing in SQL BLOB format
278 1
        for field, variant in ev_variant_dict.items():
279 1
            placeholders.append('?')
280 1
            ev_fields.append(field)
281 1
            ev_variant_binaries.append(variant.to_binary())
282
283 1
        return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
284
285 1
    def _get_event_columns(self, ev_fields):
286 1
        fields = []
287 1
        for field in ev_fields:
288 1
                fields.append(field + ' BLOB')
289 1
        return self._list_to_sql_str(fields, False)
290
291 1
    def _get_select_clauses(self, source_id, evfilter):
292 1
        s_clauses = []
293 1
        for select_clause in evfilter.SelectClauses:
294 1
            try:
295 1
                if not select_clause.BrowsePath:
296
                    s_clauses.append(select_clause.Attribute.name)
297
                else:
298 1
                    name = select_clause.BrowsePath[0].Name
299 1
                    s_clauses.append(name)
300
            except AttributeError:
301
                self.logger.error('Historizing SQL OPC UA Select Clauses Error for node %s', source_id)
302
303
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
304 1
        clauses = [x for x in s_clauses if self._check(source_id, x)]
305
306 1
        return self._list_to_sql_str(clauses)
307
308 1
    def _check(self, source_id, s_clause):
309 1
        if s_clause in self._event_fields[source_id]:
310 1
            return True
311
        else:
312
            return False
313
314 1
    def _list_to_sql_str(self, ls, quotes=True):
315 1
        sql_str = ''
316 1
        for item in ls:
317 1
            if quotes:
318 1
                sql_str += '"' + item + '", '
319
            else:
320 1
                sql_str += item + ', '
321 1
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
322
323 1
    def stop(self):
324 1
        with self._lock:
325 1
            self._conn.close()
326
            self.logger.info('Historizing SQL connection closed')
327