Completed
Push — master ( 3bacce...4da3e7 )
by Olivier
02:00
created

HistorySQLite._execute_sql()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import logging
2
from typing import Iterable, Coroutine, Optional
3
from datetime import timedelta
4
from datetime import datetime
5
from asyncio import get_event_loop
6
import sqlite3
7
8
from asyncua import ua
9
from ..ua.ua_binary import variant_from_binary, variant_to_binary
10
from ..common import Buffer, Event, get_event_properties_from_type_node
11
from .history import HistoryStorageInterface
12
13
__all__ = ["HistorySQLite"]
14
15
16
class HistorySQLite(HistoryStorageInterface):
17
    """
18
    history backend which stores data values and object events in a SQLite database
19
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
20
    the history database are in binary format (SQLite BLOBs)
21
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
22
    """
23
24
    def __init__(self, path="history.db", loop=None):
25
        self.logger = logging.getLogger(__name__)
26
        self._datachanges_period = {}
27
        self._db_file = path
28
        self._event_fields = {}
29
        self._conn: Optional[sqlite3.Connection] = None
30
        self._loop = loop or get_event_loop()
31
32
    async def init(self):
33
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
34
35
    async def stop(self):
36
        await self._loop.run_in_executor(None, self._conn.close)
37
        self._conn = None
38
        self.logger.info('Historizing SQL connection closed')
39
40
    async def _execute_sql(self, sql: str, params: Iterable = None):
41
        return await self._loop.run_in_executor(None, self._conn.execute, sql, params or ())
42
43
    async def new_historized_node(self, node_id, period, count=0):
44
        table = self._get_table_name(node_id)
45
        self._datachanges_period[node_id] = period, count
46
        # create a table for the node which will store attributes of the DataValue object
47
        # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
48
        try:
49
            await self._execute_sql(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
50
                                    ' ServerTimestamp TIMESTAMP,'
51
                                    ' SourceTimestamp TIMESTAMP,'
52
                                    ' StatusCode INTEGER,'
53
                                    ' Value TEXT,'
54
                                    ' VariantType TEXT,'
55
                                    ' VariantBinary BLOB)', None)
56
57
        except sqlite3.Error as e:
58
            self.logger.info("Historizing SQL Table Creation Error for %s: %s", node_id, e)
59
60
    async def execute_sql_delete(self, condition: str, args: Iterable, table: str, node_id):
61
        try:
62
            await self._execute_sql(f'DELETE FROM "{table}" WHERE {condition}', args)
63
        except sqlite3.Error as e:
64
            self.logger.error("Historizing SQL Delete Old Data Error for %s: %s", node_id, e)
65
66
    async def save_node_value(self, node_id, datavalue):
67
        table = self._get_table_name(node_id)
68
        # insert the data change into the database
69
        try:
70
            await self._execute_sql(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
71
                datavalue.ServerTimestamp,
72
                datavalue.SourceTimestamp,
73
                datavalue.StatusCode.value,
74
                str(datavalue.Value.Value),
75
                datavalue.Value.VariantType.name,
76
                sqlite3.Binary(variant_to_binary(datavalue.Value))
77
            ))
78
        except sqlite3.Error as e:
79
            self.logger.error("Historizing SQL Insert Error for %s: %s", node_id, e)
80
        # get this node's period from the period dict and calculate the limit
81
        period, count = self._datachanges_period[node_id]
82
        if period:
83
            # after the insert, if a period was specified delete all records older than period
84
            date_limit = datetime.utcnow() - period
85
            await self.execute_sql_delete("SourceTimestamp < ?", (date_limit,), table, node_id)
86
        if count:
87
            # ensure that no more than count records are stored for the specified node
88
            await self.execute_sql_delete(
89
                'SourceTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
90
                f'THEN MIN(SourceTimestamp) ELSE NULL END FROM "{table}")', (count,), table, node_id)
91
92
    async def read_node_history(self, node_id, start, end, nb_values):
93
        
94
        table = self._get_table_name(node_id)
95
        start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
96
        cont = None
97
        results = []
98
        # select values from the database; recreate UA Variant from binary
99
        try:
100
            rows = await self._execute_sql(
101
                    f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
102
                    f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)
103
            )
104
            for row in rows:
105
                # rebuild the data value object
106
                dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
107
                dv.ServerTimestamp = row[1]
108
                dv.SourceTimestamp = row[2]
109
                dv.StatusCode = ua.StatusCode(row[3])
110
                results.append(dv)
111
112
        except sqlite3.Error as e:
113
            self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
114
115
        if nb_values:
116
            if len(results) > nb_values:
117
                cont = results[nb_values].SourceTimestamp
118
            results = results[:nb_values]
119
        return results, cont
120
121
    async def new_historized_event(self, source_id, evtypes, period, count=0):
122
        # get all fields for the event type nodes
123
        ev_fields = await self._get_event_fields(evtypes)
124
        self._datachanges_period[source_id] = period
125
        self._event_fields[source_id] = ev_fields
126
        table = self._get_table_name(source_id)
127
        columns = self._get_event_columns(ev_fields)
128
        # create a table for the event which will store fields generated by the source object's events
129
        # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
130
        # properties with these names
131
        try:
132
            self._execute_sql(
133
                f'CREATE TABLE "{table}" '
134
                f'(_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {columns})',
135
                None
136
            )
137
        except sqlite3.Error as e:
138
            self.logger.info("Historizing SQL Table Creation Error for events from %s: %s", source_id, e)
139
140
    async def save_event(self, event):
141
        table = self._get_table_name(event.SourceNode)
142
        columns, placeholders, evtup = self._format_event(event)
143
        event_type = event.EventType  # useful for troubleshooting database
144
        # insert the event into the database
145
        try:
146
            await self._execute_sql(
147
                f'INSERT INTO "{table}" ("_Id", "_Timestamp", "_EventTypeName", {columns}) '
148
                f'VALUES (NULL, "{event.Time}", "{event_type}", {placeholders})',
149
                evtup
150
            )
151
        except sqlite3.Error as e:
152
            self.logger.error("Historizing SQL Insert Error for events from %s: %s", event.SourceNode, e)
153
        # get this node's period from the period dict and calculate the limit
154
        period = self._datachanges_period[event.SourceNode]
155
        if period:
156
            # after the insert, if a period was specified delete all records older than period
157
            date_limit = datetime.utcnow() - period
158
            try:
159
                await self._execute_sql(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
160
            except sqlite3.Error as e:
161
                self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.SourceNode, e)
162
163
    async def read_event_history(self, source_id, start, end, nb_values, evfilter):
164
        table = self._get_table_name(source_id)
165
        start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
166
        clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
167
        cont = None
168
        cont_timestamps = []
169
        results = []
170
        # select events from the database; SQL select clause is built from EventFilter and available fields
171
        try:
172
            for row in await self._execute_sql(
173
                    f'SELECT "_Timestamp", {clauses_str} FROM "{table}" '
174
                    f'WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {order} LIMIT ?',
175
                    (start_time, end_time, limit)):
176
                fdict = {}
177
                cont_timestamps.append(row[0])
178
                for i, field in enumerate(row[1:]):
179
                    if field is not None:
180
                        fdict[clauses[i]] = variant_from_binary(Buffer(field))
181
                    else:
182
                        fdict[clauses[i]] = ua.Variant(None)
183
                results.append(Event.from_field_dict(fdict))
184
        except sqlite3.Error as e:
185
            self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
186
        if nb_values:
187
            if len(results) > nb_values:  # start > ua.get_win_epoch() and
188
                cont = cont_timestamps[nb_values]
189
            results = results[:nb_values]
190
        return results, cont
191
192
    def _get_table_name(self, node_id):
193
        return f"{node_id.NamespaceIndex}_{node_id.Identifier}"
194
195
    async def _get_event_fields(self, evtypes):
196
        """
197
        Get all fields from the event types that are to be historized
198
        Args:
199
            evtypes: List of event type nodes
200
201
        Returns: List of fields for all event types
202
        """
203
        # get all fields from the event types that are to be historized
204
        ev_aggregate_fields = []
205
        for event_type in evtypes:
206
            ev_aggregate_fields.extend((await get_event_properties_from_type_node(event_type)))
207
        ev_fields = []
208
        for field in set(ev_aggregate_fields):
209
            ev_fields.append((await field.get_display_name()).Text)
210
        return ev_fields
211
212
    @staticmethod
213
    def _get_bounds(start, end, nb_values):
214
        order = "ASC"
215
        if start is None or start == ua.get_win_epoch():
216
            order = "DESC"
217
            start = ua.get_win_epoch()
218
        if end is None or end == ua.get_win_epoch():
219
            end = datetime.utcnow() + timedelta(days=1)
220
        if start < end:
221
            start_time = start.isoformat(" ")
222
            end_time = end.isoformat(" ")
223
        else:
224
            order = "DESC"
225
            start_time = end.isoformat(" ")
226
            end_time = start.isoformat(" ")
227
        if nb_values:
228
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
229
        else:
230
            limit = -1  # in SQLite a LIMIT of -1 returns all results
231
        return start_time, end_time, order, limit
232
233
    def _format_event(self, event):
234
        """
235
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
236
237
        Args:
238
            event: The event returned by the subscription
239
240
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
241
        """
242
        placeholders = []
243
        ev_variant_binaries = []
244
        ev_variant_dict = event.get_event_props_as_fields_dict()
245
        names = list(ev_variant_dict.keys())
246
        names.sort()  # sort alphabetically since dict is not sorted
247
        # split dict into two synchronized lists which will be converted to SQL strings
248
        # note that the variants are converted to binary objects for storing in SQL BLOB format
249
        for name in names:
250
            variant = ev_variant_dict[name]
251
            placeholders.append("?")
252
            ev_variant_binaries.append(sqlite3.Binary(variant_to_binary(variant)))
253
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
254
255
    def _get_event_columns(self, ev_fields):
256
        fields = []
257
        for field in ev_fields:
258
            fields.append(field + " BLOB")
259
        return self._list_to_sql_str(fields, False)
260
261
    def _get_select_clauses(self, source_id, evfilter):
262
        s_clauses = []
263
        for select_clause in evfilter.SelectClauses:
264
            try:
265
                if not select_clause.BrowsePath:
266
                    s_clauses.append(select_clause.Attribute.name)
267
                else:
268
                    name = select_clause.BrowsePath[0].Name
269
                    s_clauses.append(name)
270
            except AttributeError:
271
                self.logger.warning("Historizing SQL OPC UA Select Clause Warning for node %s,"
272
                                    " Clause: %s:", source_id, select_clause)
273
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
274
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
275
        return clauses, self._list_to_sql_str(clauses)
276
277
    @staticmethod
278
    def _list_to_sql_str(ls, quotes=True):
279
        items = [f'"{item}"' if quotes else str(item) for item in ls]
280
        return ", ".join(items)
281
282