Completed
Push — master ( eb5e78...c75f71 )
by Olivier
04:30
created

HistorySQLite._get_event_columns()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 5
ccs 3
cts 3
cp 1
crap 2
rs 9.4285
1 1
import logging
2 1
from datetime import timedelta
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
import sqlite3
6 1
7 1
from opcua import ua
8 1
from opcua.common.utils import Buffer
9
from opcua.common import events
10
from opcua.server.history import HistoryStorageInterface
11 1
12
13
class HistorySQLite(HistoryStorageInterface):
14
    """
15
    history backend which stores data values and object events in a SQLite database
16
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
17
    the history database are in binary format (SQLite BLOBs)
18
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
19 1
    """
20 1
21 1
    def __init__(self, path="history.db"):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self._datachanges_period = {}
24 1
        self._db_file = path
25
        self._lock = Lock()
26 1
        self._event_fields = {}
27
28 1
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
29 1
30 1
    def new_historized_node(self, node_id, period, count=0):
31
        with self._lock:
32 1
            _c_new = self._conn.cursor()
33
34 1
            table = self._get_table_name(node_id)
35
36
            self._datachanges_period[node_id] = period, count
37
38 1
            # create a table for the node which will store attributes of the DataValue object
39 1
            # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
40
            try:
41
                _c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL,'
42
                               ' ServerTimestamp TIMESTAMP,'
43
                               ' SourceTimestamp TIMESTAMP,'
44
                               ' StatusCode INTEGER,'
45
                               ' Value TEXT,'
46
                               ' VariantType TEXT,'
47
                               ' VariantBinary BLOB)'.format(tn=table))
48
49
            except sqlite3.Error as e:
50 1
                self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
51
52 1
            self._conn.commit()
53 1
54 1
    def save_node_value(self, node_id, datavalue):
55
        with self._lock:
56 1
            _c_sub = self._conn.cursor()
57
58
            table = self._get_table_name(node_id)
59 1
60 1
            # insert the data change into the database
61
            try:
62
                _c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
63
                               (
64
                                   datavalue.ServerTimestamp,
65
                                   datavalue.SourceTimestamp,
66
                                   datavalue.StatusCode.value,
67
                                   str(datavalue.Value.Value),
68
                                   datavalue.Value.VariantType.name,
69
                                   sqlite3.Binary(datavalue.Value.to_binary())
70
                               )
71
                              )
72
            except sqlite3.Error as e:
73 1
                self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
74
75
            self._conn.commit()
76 1
77
            # get this node's period from the period dict and calculate the limit
78 1
            period, count = self._datachanges_period[node_id]
79
80
            def execute_sql_delete(condition, args):
81
                query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
82
83
                try:
84
                    _c_sub.execute(query, args)
85
                except sqlite3.Error as e:
86
                    self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
87
88
                self._conn.commit()
89
90 1
            if period:
91 1
                # after the insert, if a period was specified delete all records older than period
92 1
                date_limit = datetime.utcnow() - period
93
                execute_sql_delete('ServerTimestamp < ?', (date_limit,))
94 1
95
            if count:
96 1
                # ensure that no more than count records are stored for the specified node
97 1
                execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
98 1
                                   'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
99
100 1
    def read_node_history(self, node_id, start, end, nb_values):
101 1
        with self._lock:
102
            _c_read = self._conn.cursor()
103 1
104 1
            table = self._get_table_name(node_id)
105 1
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
106
107 1
            cont = None
108 1
            results = []
109 1
110
            # select values from the database; recreate UA Variant from binary
111 1
            try:
112 1
                for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
113
                                           'ORDER BY "_Id" {dir} LIMIT ?'.format(tn=table, dir=order),
114 1
                                           (start_time, end_time, limit,)):
115
116 1
                    # rebuild the data value object
117
                    dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
118 1
                    dv.ServerTimestamp = row[1]
119 1
                    dv.SourceTimestamp = row[2]
120
                    dv.StatusCode = ua.StatusCode(row[3])
121
122 1
                    results.append(dv)
123 1
124
            except sqlite3.Error as e:
125
                self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
126
127
            if nb_values:
128 1
                if len(results) > nb_values:
129 1
                    cont = results[nb_values].ServerTimestamp
130 1
131 1
                results = results[:nb_values]
132
133 1
            return results, cont
134
135
    def new_historized_event(self, source_id, evtypes, period, count=0):
136
        with self._lock:
137
            _c_new = self._conn.cursor()
138 1
139 1
            # get all fields for the event type nodes
140 1
            ev_fields = self._get_event_fields(evtypes)
141
142 1
            self._datachanges_period[source_id] = period
143
            self._event_fields[source_id] = ev_fields
144 1
145
            table = self._get_table_name(source_id)
146 1
            columns = self._get_event_columns(ev_fields)
147 1
148 1
            # create a table for the event which will store fields generated by the source object's events
149
            # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
150 1
            # properties with these names
151 1
            try:
152
                _c_new.execute(
153 1
                    'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
154 1
                    .format(tn=table, co=columns))
155
156
            except sqlite3.Error as e:
157
                self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
158
159 1
            self._conn.commit()
160 1
161
    def save_event(self, event):
162
        with self._lock:
163
            _c_sub = self._conn.cursor()
164
165
            table = self._get_table_name(event.SourceNode)
166
            columns, placeholders, evtup = self._format_event(event)
167
            event_type = event.EventType  # useful for troubleshooting database
168 1
169
            # insert the event into the database
170 1
            try:
171 1
                _c_sub.execute(
172 1
                    'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
173
                    .format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
174 1
175 1
            except sqlite3.Error as e:
176 1
                self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
177
178
            self._conn.commit()
179 1
180 1
            # get this node's period from the period dict and calculate the limit
181
            period = self._datachanges_period[event.SourceNode]
182
183
            if period:
184
                # after the insert, if a period was specified delete all records older than period
185
                date_limit = datetime.now() - period
186 1
187
                try:
188
                    _c_sub.execute('DELETE FROM "{tn}" WHERE Time < ?'.format(tn=table),
189 1
                                   (date_limit.isoformat(' '),))
190
                except sqlite3.Error as e:
191 1
                    self.logger.error('Historizing SQL Delete Old Data Error for events from %s: %s',
192
                                      event.SourceNode, e)
193
194
                self._conn.commit()
195
196
    def read_event_history(self, source_id, start, end, nb_values, evfilter):
197
        with self._lock:
198
            _c_read = self._conn.cursor()
199
200
            table = self._get_table_name(source_id)
201
            start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
202
            clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
203 1
204 1
            cont = None
205
            cont_timestamps = []
206 1
            results = []
207
208 1
            # select events from the database; SQL select clause is built from EventFilter and available fields
209
            try:
210 1
                for row in _c_read.execute(
211 1
                        'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
212 1
                        .format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
213
214 1
                    fdict = {}
215 1
                    cont_timestamps.append(row[0])
216
                    for i, field in enumerate(row[1:]):
217 1
                        if field is not None:
218 1
                            fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
219 1
                        else:
220
                            fdict[clauses[i]] = ua.Variant(None)
221 1
222 1
                    results.append(events.Event.from_field_dict(fdict))
223 1
224
            except sqlite3.Error as e:
225 1
                self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
226 1
227
            if nb_values:
228 1
                if len(results) > nb_values:  # start > ua.DateTimeMinValue and
229
                    cont = cont_timestamps[nb_values]
230 1
231 1
                results = results[:nb_values]
232
233 1
            return results, cont
234 1
235 1
    def _get_table_name(self, node_id):
236
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
237
238 1
    def _get_event_fields(self, evtypes):
239 1
        """
240
        Get all fields from the event types that are to be historized
241
        Args:
242
            evtypes: List of event type nodes
243
244 1
        Returns: List of fields for all event types
245 1
246 1
        """
247
        # get all fields from the event types that are to be historized
248 1
        ev_aggregate_fields = []
249 1
        for event_type in evtypes:
250
            ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
251 1
252 1
        ev_fields = []
253
        for field in set(ev_aggregate_fields):
254
            ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
255 1
        return ev_fields
256
257 1
    @staticmethod
258
    def _get_bounds(start, end, nb_values):
259
        order = "ASC"
260
261
        if start is None or start == ua.DateTimeMinValue:
262 1
            order = "DESC"
263 1
            start = ua.DateTimeMinValue
264 1
265
        if end is None or end == ua.DateTimeMinValue:
266 1
            end = datetime.utcnow() + timedelta(days=1)
267
268 1
        if start < end:
269
            start_time = start.isoformat(' ')
270 1
            end_time = end.isoformat(' ')
271 1
        else:
272
            order = "DESC"
273 1
            start_time = end.isoformat(' ')
274 1
            end_time = start.isoformat(' ')
275 1
276 1
        if nb_values:
277
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
278 1
        else:
279
            limit = -1  # in SQLite a LIMIT of -1 returns all results
280
281
        return start_time, end_time, order, limit
282 1
283 1
    def _format_event(self, event):
284 1
        """
285 1
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
286
287 1
        Args:
288
            event: The event returned by the subscription
289 1
290 1
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
291 1
292 1
        """
293 1
        placeholders = []
294
        ev_variant_binaries = []
295 1
296 1
        ev_variant_dict = event.get_event_props_as_fields_dict()
297 1
        names = list(ev_variant_dict.keys())
298 1
        names.sort()  # sort alphabetically since dict is not sorted
299 1
300
        # split dict into two synchronized lists which will be converted to SQL strings
301
        # note that the variants are converted to binary objects for storing in SQL BLOB format
302 1
        for name in names:
303 1
            variant = ev_variant_dict[name]
304
            placeholders.append('?')
305
            ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
306
307
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
308
309 1
    def _get_event_columns(self, ev_fields):
310
        fields = []
311 1
        for field in ev_fields:
312
            fields.append(field + ' BLOB')
313 1
        return self._list_to_sql_str(fields, False)
314 1
315 1
    def _get_select_clauses(self, source_id, evfilter):
316
        s_clauses = []
317
        for select_clause in evfilter.SelectClauses:
318
            try:
319 1
                if not select_clause.BrowsePath:
320 1
                    s_clauses.append(select_clause.Attribute.name)
321 1
                else:
322 1
                    name = select_clause.BrowsePath[0].Name
323 1
                    s_clauses.append(name)
324
            except AttributeError:
325 1
                self.logger.warning('Historizing SQL OPC UA Select Clause Warning for node %s,'
326 1
                                    ' Clause: %s:', source_id, select_clause)
327
328 1
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
329 1
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
330 1
        return clauses, self._list_to_sql_str(clauses)
331 1
332
    @staticmethod
333
    def _list_to_sql_str(ls, quotes=True):
334
        sql_str = ''
335
        for item in ls:
336
            if quotes:
337
                sql_str += '"' + item + '", '
338
            else:
339
                sql_str += item + ', '
340
        return sql_str[:-2]  # remove trailing space and comma for SQL syntax
341
342
    def stop(self):
343
        with self._lock:
344
            self._conn.close()
345
            self.logger.info('Historizing SQL connection closed')
346