Completed
Pull Request — master (#158)
by Olivier
02:49
created

HistorySQLite.read_node_history()   F

Complexity

Conditions 13

Size

Total Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 13.0314
Metric Value
cc 13
dl 0
loc 52
ccs 33
cts 35
cp 0.9429
crap 13.0314
rs 2.9976

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like HistorySQLite.read_node_history() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._events = {}
20 1
        self._db_file = path
21 1
        self._lock = Lock()
22
23 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
24
25 1
    def new_historized_node(self, node_id, period, count=0):
26 1
        with self._lock:
27 1
            _c_new = self._conn.cursor()
28
29 1
            table = self._get_table_name(node_id)
30
31 1
            self._datachanges_period[node_id] = period, count
32
33
            # create a table for the node which will store attributes of the DataValue object
34
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
35 1
            try:
36 1
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
37
                               ' ServerTimestamp TIMESTAMP,'
38
                               ' SourceTimestamp TIMESTAMP,'
39
                               ' StatusCode INTEGER,'
40
                               ' Value TEXT,'
41
                               ' VariantType TEXT,'
42
                               ' VariantBinary BLOB)'.format(tn=table))
43
44
            except sqlite3.Error as e:
45
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
46
47 1
            self._conn.commit()
48
49 1
    def save_node_value(self, node_id, datavalue):
50 1
        with self._lock:
51 1
            _c_sub = self._conn.cursor()
52
53 1
            table = self._get_table_name(node_id)
54
55
            # insert the data change into the database
56 1
            try:
57 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
58
                               (
59
                                   datavalue.ServerTimestamp,
60
                                   datavalue.SourceTimestamp,
61
                                   datavalue.StatusCode.value,
62
                                   str(datavalue.Value.Value),
63
                                   datavalue.Value.VariantType.name,
64
                                   datavalue.Value.to_binary()
65
                               )
66
                               )
67
            except sqlite3.Error as e:
68
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
69
70 1
            self._conn.commit()
71
72
            # get this node's period from the period dict and calculate the limit
73 1
            period, count = self._datachanges_period[node_id]
74
75 1
            if period:
76
                # after the insert, if a period was specified delete all records older than period
77
                date_limit = datetime.now() - period
78
79
                try:
80
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
81
                                   (date_limit.isoformat(' '),))
82
                except sqlite3.Error as e:
83
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
84
85
                self._conn.commit()
86
87 1
    def read_node_history(self, node_id, start, end, nb_values):
88 1
        with self._lock:
89 1
            _c_read = self._conn.cursor()
90
91 1
            order = "ASC"
92
93 1
            if start is None or start == ua.DateTimeMinValue:
94 1
                order = "DESC"
95 1
                start = ua.DateTimeMinValue
96
97 1
            if end is None or end == ua.DateTimeMinValue:
98 1
                end = datetime.utcnow() + timedelta(days=1)
99
100 1
            if start < end:
101 1
                start_time = start.isoformat(' ')
102 1
                end_time = end.isoformat(' ')
103
            else:
104 1
                order = "DESC"
105 1
                start_time = end.isoformat(' ')
106 1
                end_time = start.isoformat(' ')
107
108 1
            if nb_values:
109 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
110
            else:
111 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
112
113 1
            table = self._get_table_name(node_id)
114
115 1
            cont = None
116 1
            results = []
117
118
            # select values from the database; recreate UA Variant from binary ORDER BY "ServerTimestamp" DESC
119 1
            try:
120 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
121
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
122 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
123 1
                    dv.ServerTimestamp = row[1]
124 1
                    dv.SourceTimestamp = row[2]
125 1
                    dv.StatusCode = ua.StatusCode(row[3])
126
127 1
                    results.append(dv)
128
129
            except sqlite3.Error as e:
130
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
131
132 1
            if nb_values:
133 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
134 1
                    cont = results[nb_values].ServerTimestamp
135
136 1
                results = results[:nb_values]
137
138 1
            return results, cont
139
140 1
    def new_historized_event(self, event, period):
141
        raise NotImplementedError
142
143 1
    def save_event(self, event):
144
        raise NotImplementedError
145
146 1
    def read_event_history(self, start, end, evfilter):
147
        raise NotImplementedError
148
149 1
    def _get_table_name(self, node_id):
150 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
151
152 1
    def stop(self):
153 1
        with self._lock:
154 1
            self._conn.close()
155
156