Completed
Pull Request — master (#65)
by
unknown
02:13
created

  A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import logging
2
import aiosqlite
3
from typing import Iterable, Optional
4
from datetime import timedelta
5
from datetime import datetime
6
from asyncio import get_event_loop
7
import sqlite3
8
9
from asyncua import ua
10
from ..ua.ua_binary import variant_from_binary, variant_to_binary
11
from ..common.utils import Buffer
12
from ..common.events import Event, get_event_properties_from_type_node
13
from .history import HistoryStorageInterface
14
15
16
class HistorySQLite(HistoryStorageInterface):
17
    """
18
    history backend which stores data values and object events in a SQLite database
19
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
20
    the history database are in binary format (SQLite BLOBs)
21
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
22
    """
23
24
    def __init__(self, path="history.db", loop=None):
25
        self.logger = logging.getLogger(__name__)
26
        self._datachanges_period = {}
27
        self._db_file = path
28
        self._event_fields = {}
29
        self._db: aiosqlite.Connection = None
30
        self._loop = loop or get_event_loop()
31
32
    async def init(self):
33
        self._db = await aiosqlite.connect(self._db_file, loop=self._loop)
34
35
    async def stop(self):
36
        await self._db.close()
37
        self.logger.info('Historizing SQL connection closed')
38
39
    async def new_historized_node(self, node_id, period, count=0):
40
        table = self._get_table_name(node_id)
41
        self._datachanges_period[node_id] = period, count
42
        # create a table for the node which will store attributes of the DataValue object
43
        # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
44
        try:
45
            await self._db.execute(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
46
                                   ' ServerTimestamp TIMESTAMP,'
47
                                   ' SourceTimestamp TIMESTAMP,'
48
                                   ' StatusCode INTEGER,'
49
                                   ' Value TEXT,'
50
                                   ' VariantType TEXT,'
51
                                   ' VariantBinary BLOB)', None)
52
            await self._db.commit()
53
54
        except sqlite3.Error as e:
55
            self.logger.info("Historizing SQL Table Creation Error for %s: %s", node_id, e)
56
57
    async def execute_sql_delete(self, condition: str, args: Iterable, table: str, node_id):
58
        try:
59
            await self._db.execute(f'DELETE FROM "{table}" WHERE {condition}', args)
60
            await self._db.commit()
61
        except sqlite3.Error as e:
62
            self.logger.error("Historizing SQL Delete Old Data Error for %s: %s", node_id, e)
63
64
    async def save_node_value(self, node_id, datavalue):
65
        table = self._get_table_name(node_id)
66
        # insert the data change into the database
67
        try:
68
            await self._db.execute(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
69
                datavalue.ServerTimestamp,
70
                datavalue.SourceTimestamp,
71
                datavalue.StatusCode.value,
72
                str(datavalue.Value.Value),
73
                datavalue.Value.VariantType.name,
74
                sqlite3.Binary(variant_to_binary(datavalue.Value))
75
            ))
76
            await self._db.commit()
77
        except sqlite3.Error as e:
78
            self.logger.error("Historizing SQL Insert Error for %s: %s", node_id, e)
79
        # get this node's period from the period dict and calculate the limit
80
        period, count = self._datachanges_period[node_id]
81
        if period:
82
            # after the insert, if a period was specified delete all records older than period
83
            date_limit = datetime.utcnow() - period
84
            await self.execute_sql_delete("SourceTimestamp < ?", (date_limit,), table, node_id)
85
        if count:
86
            # ensure that no more than count records are stored for the specified node
87
            await self.execute_sql_delete(
88
                'SourceTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
89
                f'THEN MIN(SourceTimestamp) ELSE NULL END FROM "{table}")', (count,), table, node_id)
90
91
    async def read_node_history(self, node_id, start, end, nb_values):
92
        table = self._get_table_name(node_id)
93
        start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
94
        cont = None
95
        results = []
96
        # select values from the database; recreate UA Variant from binary
97
        try:
98
            async with self._db.execute(
99
                    f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
100
                    f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)) as cursor:
101
                async for row in cursor:
102
                    # rebuild the data value object
103
                    dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
104
                    dv.ServerTimestamp = row[1]
105
                    dv.SourceTimestamp = row[2]
106
                    dv.StatusCode = ua.StatusCode(row[3])
107
                    results.append(dv)
108
        except sqlite3.Error as e:
109
            self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
110
        if nb_values:
111
            if len(results) > nb_values:
112
                cont = results[nb_values].SourceTimestamp
113
            results = results[:nb_values]
114
        return results, cont
115
116
    async def new_historized_event(self, source_id, evtypes, period, count=0):
117
        # get all fields for the event type nodes
118
        ev_fields = await self._get_event_fields(evtypes)
119
        self._datachanges_period[source_id] = period
120
        self._event_fields[source_id] = ev_fields
121
        table = self._get_table_name(source_id)
122
        columns = self._get_event_columns(ev_fields)
123
        # create a table for the event which will store fields generated by the source object's events
124
        # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
125
        # properties with these names
126
        try:
127
            await self._db.execute(
128
                f'CREATE TABLE "{table}" '
129
                f'(_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {columns})',
130
                None
131
            )
132
            await self._db.commit()
133
        except sqlite3.Error as e:
134
            self.logger.info("Historizing SQL Table Creation Error for events from %s: %s", source_id, e)
135
136
    async def save_event(self, event):
137
        table = self._get_table_name(event.SourceNode)
138
        columns, placeholders, evtup = self._format_event(event)
139
        event_type = event.EventType  # useful for troubleshooting database
140
        # insert the event into the database
141
        try:
142
            await self._db.execute(
143
                f'INSERT INTO "{table}" ("_Id", "_Timestamp", "_EventTypeName", {columns}) '
144
                f'VALUES (NULL, "{event.Time}", "{event_type}", {placeholders})',
145
                evtup
146
            )
147
            await self._db.commit()
148
        except sqlite3.Error as e:
149
            self.logger.error("Historizing SQL Insert Error for events from %s: %s", event.SourceNode, e)
150
        # get this node's period from the period dict and calculate the limit
151
        period = self._datachanges_period[event.SourceNode]
152
        if period:
153
            # after the insert, if a period was specified delete all records older than period
154
            date_limit = datetime.utcnow() - period
155
            try:
156
                await self._db.execute(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
157
                await self._db.commit()
158
            except sqlite3.Error as e:
159
                self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.SourceNode, e)
160
161
    async def read_event_history(self, source_id, start, end, nb_values, evfilter):
162
        table = self._get_table_name(source_id)
163
        start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
164
        clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
165
        cont = None
166
        cont_timestamps = []
167
        results = []
168
        # select events from the database; SQL select clause is built from EventFilter and available fields
169
        try:
170
            async with self._db.execute(
171
                    f'SELECT "_Timestamp", {clauses_str} FROM "{table}" '
172
                    f'WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {order} LIMIT ?',
173
                    (start_time, end_time, limit)) as cursor:
174
                async for row in cursor:
175
                    fdict = {}
176
                    cont_timestamps.append(row[0])
177
                    for i, field in enumerate(row[1:]):
178
                        if field is not None:
179
                            fdict[clauses[i]] = variant_from_binary(Buffer(field))
180
                        else:
181
                            fdict[clauses[i]] = ua.Variant(None)
182
                    results.append(Event.from_field_dict(fdict))
183
        except sqlite3.Error as e:
184
            self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
185
        if nb_values:
186
            if len(results) > nb_values:  # start > ua.get_win_epoch() and
187
                cont = cont_timestamps[nb_values]
188
            results = results[:nb_values]
189
        return results, cont
190
191
    def _get_table_name(self, node_id):
192
        return f"{node_id.NamespaceIndex}_{node_id.Identifier}"
193
194
    async def _get_event_fields(self, evtypes):
195
        """
196
        Get all fields from the event types that are to be historized
197
        Args:
198
            evtypes: List of event type nodes
199
200
        Returns: List of fields for all event types
201
        """
202
        # get all fields from the event types that are to be historized
203
        ev_aggregate_fields = []
204
        for event_type in evtypes:
205
            ev_aggregate_fields.extend((await get_event_properties_from_type_node(event_type)))
206
        ev_fields = []
207
        for field in set(ev_aggregate_fields):
208
            ev_fields.append((await field.get_display_name()).Text)
209
        return ev_fields
210
211
    @staticmethod
212
    def _get_bounds(start, end, nb_values):
213
        order = "ASC"
214
        if start is None or start == ua.get_win_epoch():
215
            order = "DESC"
216
            start = ua.get_win_epoch()
217
        if end is None or end == ua.get_win_epoch():
218
            end = datetime.utcnow() + timedelta(days=1)
219
        if start < end:
220
            start_time = start.isoformat(" ")
221
            end_time = end.isoformat(" ")
222
        else:
223
            order = "DESC"
224
            start_time = end.isoformat(" ")
225
            end_time = start.isoformat(" ")
226
        if nb_values:
227
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
228
        else:
229
            limit = -1  # in SQLite a LIMIT of -1 returns all results
230
        return start_time, end_time, order, limit
231
232
    def _format_event(self, event):
233
        """
234
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
235
236
        Args:
237
            event: The event returned by the subscription
238
239
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
240
        """
241
        placeholders = []
242
        ev_variant_binaries = []
243
        ev_variant_dict = event.get_event_props_as_fields_dict()
244
        names = list(ev_variant_dict.keys())
245
        names.sort()  # sort alphabetically since dict is not sorted
246
        # split dict into two synchronized lists which will be converted to SQL strings
247
        # note that the variants are converted to binary objects for storing in SQL BLOB format
248
        for name in names:
249
            variant = ev_variant_dict[name]
250
            placeholders.append("?")
251
            ev_variant_binaries.append(sqlite3.Binary(variant_to_binary(variant)))
252
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
253
254
    def _get_event_columns(self, ev_fields):
255
        fields = []
256
        for field in ev_fields:
257
            fields.append(field + " BLOB")
258
        return self._list_to_sql_str(fields, False)
259
260
    def _get_select_clauses(self, source_id, evfilter):
261
        s_clauses = []
262
        for select_clause in evfilter.SelectClauses:
263
            try:
264
                if not select_clause.BrowsePath:
265
                    s_clauses.append(select_clause.Attribute.name)
266
                else:
267
                    name = select_clause.BrowsePath[0].Name
268
                    s_clauses.append(name)
269
            except AttributeError:
270
                self.logger.warning("Historizing SQL OPC UA Select Clause Warning for node %s,"
271
                                    " Clause: %s:", source_id, select_clause)
272
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
273
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
274
        return clauses, self._list_to_sql_str(clauses)
275
276
    @staticmethod
277
    def _list_to_sql_str(ls, quotes=True):
278
        items = [f'"{item}"' if quotes else str(item) for item in ls]
279
        return ", ".join(items)
280