Completed
Pull Request — master (#158)
by Olivier
02:49
created

HistorySQLite.stop()   A

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2
Metric Value
cc 2
dl 0
loc 3
ccs 3
cts 3
cp 1
crap 2
rs 10
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from opcua import ua
6 1
from opcua.common.utils import Buffer
7 1
from opcua.server.history import HistoryStorageInterface
8 1
import sqlite3
9
10
11 1
class HistorySQLite(HistoryStorageInterface):
12
    """
13
    very minimal history backend storing data in SQLite database
14
    """
15
16 1
    def __init__(self, path="history.db"):
17 1
        self.logger = logging.getLogger(__name__)
18 1
        self._datachanges_period = {}
19 1
        self._events = {}
20 1
        self._db_file = path
21 1
        self._lock = Lock()
22
23 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
24
25 1
    def new_historized_node(self, node_id, period, count=0):
26 1
        with self._lock:
27 1
            _c_new = self._conn.cursor()
28
29 1
            table = self._get_table_name(node_id)
30
31 1
            self._datachanges_period[node_id] = period, count
32
33
            # create a table for the node which will store attributes of the DataValue object
34
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
35 1
            try:
36 1
                _c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
37
                               ' ServerTimestamp TIMESTAMP,'
38
                               ' SourceTimestamp TIMESTAMP,'
39
                               ' StatusCode INTEGER,'
40
                               ' Value TEXT,'
41
                               ' VariantType TEXT,'
42
                               ' VariantBinary BLOB)'.format(tn=table))
43
44
            except sqlite3.Error as e:
45
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
46
47 1
            self._conn.commit()
48
49 1
    def save_node_value(self, node_id, datavalue):
50 1
        with self._lock:
51 1
            _c_sub = self._conn.cursor()
52
53 1
            table = self._get_table_name(node_id)
54
55
            # insert the data change into the database
56 1
            try:
57 1
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
58
                               (
59
                                   datavalue.ServerTimestamp,
60
                                   datavalue.SourceTimestamp,
61
                                   datavalue.StatusCode.value,
62
                                   str(datavalue.Value.Value),
63
                                   datavalue.Value.VariantType.name,
64
                                   datavalue.Value.to_binary()
65
                               )
66
                               )
67
            except sqlite3.Error as e:
68
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
69
70 1
            self._conn.commit()
71
72
            # get this node's period from the period dict and calculate the limit
73 1
            period, count = self._datachanges_period[node_id]
74
75 1
            if period:
76
                # after the insert, if a period was specified delete all records older than period
77
                date_limit = datetime.now() - period
78
79
                try:
80
                    _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
81
                                   (date_limit.isoformat(' '),))
82
                except sqlite3.Error as e:
83
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
84
85
                self._conn.commit()
86
87 1
    def read_node_history(self, node_id, start, end, nb_values):
88 1
        with self._lock:
89 1
            _c_read = self._conn.cursor()
90
91 1
            order = "ASC"
92
93 1
            if start is None or start == ua.DateTimeMinValue:
94 1
                order = "DESC"
95 1
                start = ua.DateTimeMinValue
96
97 1
            if end is None or end == ua.DateTimeMinValue:
98 1
                end = datetime.utcnow() + timedelta(days=1)
99
100 1
            if start < end:
101 1
                start_time = start.isoformat(' ')
102 1
                end_time = end.isoformat(' ')
103
            else:
104 1
                order = "DESC"
105 1
                start_time = end.isoformat(' ')
106 1
                end_time = start.isoformat(' ')
107
108 1
            if nb_values:
109 1
                limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
110
            else:
111 1
                limit = -1  # in SQLite a LIMIT of -1 returns all results
112
113 1
            table = self._get_table_name(node_id)
114
115 1
            cont = None
116 1
            results = []
117
118
            # select values from the database; recreate UA Variant from binary ORDER BY "ServerTimestamp" DESC
119 1
            try:
120 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
121
                                           'ORDER BY "Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
122 1
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
123 1
                    dv.ServerTimestamp = row[1]
124 1
                    dv.SourceTimestamp = row[2]
125 1
                    dv.StatusCode = ua.StatusCode(row[3])
126
127 1
                    results.append(dv)
128
129
            except sqlite3.Error as e:
130
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
131
132 1
            if nb_values:
133 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
134 1
                    cont = results[nb_values].ServerTimestamp
135
136 1
                results = results[:nb_values]
137
138 1
            return results, cont
139
140 1
    def new_historized_event(self, event, period):
141
        raise NotImplementedError
142
143 1
    def save_event(self, event):
144
        raise NotImplementedError
145
146 1
    def read_event_history(self, start, end, evfilter):
147
        raise NotImplementedError
148
149 1
    def _get_table_name(self, node_id):
150 1
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
151
152 1
    def stop(self):
153 1
        with self._lock:
154 1
            self._conn.close()
155
156