Passed
Pull Request — master (#65)
by
unknown
02:23
created

  A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import logging
2
import aiosqlite
3
import sqlite3
4
from typing import Iterable
5
from datetime import timedelta
6
from datetime import datetime
7
from asyncio import get_event_loop
8
9
from asyncua import ua
10
from ..ua.ua_binary import variant_from_binary, variant_to_binary
11
from ..common.utils import Buffer
12
from ..common.events import Event, get_event_properties_from_type_node
13
from .history import HistoryStorageInterface
14
15
16
class HistorySQLite(HistoryStorageInterface):
17
    """
18
    history backend which stores data values and object events in a SQLite database
19
    this backend is intended to only be accessed via OPC UA, therefore all UA Variants saved in
20
    the history database are in binary format (SQLite BLOBs)
21
    note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
22
    """
23
24
    def __init__(self, path="history.db", loop=None):
25
        self.logger = logging.getLogger(__name__)
26
        self._datachanges_period = {}
27
        self._db_file = path
28
        self._event_fields = {}
29
        self._db: aiosqlite.Connection = None
30
        self._loop = loop or get_event_loop()
31
32
    async def init(self):
33
        self._db = await aiosqlite.connect(self._db_file, loop=self._loop, detect_types=sqlite3.PARSE_DECLTYPES)
34
35
    async def stop(self):
36
        await self._db.close()
37
        self.logger.info('Historizing SQL connection closed')
38
39
    async def new_historized_node(self, node_id, period, count=0):
40
        table = self._get_table_name(node_id)
41
        self._datachanges_period[node_id] = period, count
42
        # create a table for the node which will store attributes of the DataValue object
43
        # note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
44
        try:
45
            await self._db.execute(f'CREATE TABLE "{table}" (_Id INTEGER PRIMARY KEY NOT NULL,'
46
                                   ' ServerTimestamp TIMESTAMP,'
47
                                   ' SourceTimestamp TIMESTAMP,'
48
                                   ' StatusCode INTEGER,'
49
                                   ' Value TEXT,'
50
                                   ' VariantType TEXT,'
51
                                   ' VariantBinary BLOB)', None)
52
            await self._db.commit()
53
        except aiosqlite.Error as e:
54
            self.logger.info("Historizing SQL Table Creation Error for %s: %s", node_id, e)
55
56
    async def execute_sql_delete(self, condition: str, args: Iterable, table: str, node_id):
57
        try:
58
            await self._db.execute(f'DELETE FROM "{table}" WHERE {condition}', args)
59
            await self._db.commit()
60
        except aiosqlite.Error as e:
61
            self.logger.error("Historizing SQL Delete Old Data Error for %s: %s", node_id, e)
62
63
    async def save_node_value(self, node_id, datavalue):
64
        table = self._get_table_name(node_id)
65
        # insert the data change into the database
66
        try:
67
            await self._db.execute(f'INSERT INTO "{table}" VALUES (NULL, ?, ?, ?, ?, ?, ?)', (
68
                datavalue.ServerTimestamp,
69
                datavalue.SourceTimestamp,
70
                datavalue.StatusCode.value,
71
                str(datavalue.Value.Value),
72
                datavalue.Value.VariantType.name,
73
                sqlite3.Binary(variant_to_binary(datavalue.Value))
74
            ))
75
            await self._db.commit()
76
        except aiosqlite.Error as e:
77
            self.logger.error("Historizing SQL Insert Error for %s: %s", node_id, e)
78
        # get this node's period from the period dict and calculate the limit
79
        period, count = self._datachanges_period[node_id]
80
        if period:
81
            # after the insert, if a period was specified delete all records older than period
82
            date_limit = datetime.utcnow() - period
83
            await self.execute_sql_delete("SourceTimestamp < ?", (date_limit,), table, node_id)
84
        if count:
85
            # ensure that no more than count records are stored for the specified node
86
            await self.execute_sql_delete(
87
                'SourceTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
88
                f'THEN MIN(SourceTimestamp) ELSE NULL END FROM "{table}")', (count,), table, node_id)
89
90
    async def read_node_history(self, node_id, start, end, nb_values):
91
        table = self._get_table_name(node_id)
92
        start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
93
        cont = None
94
        results = []
95
        # select values from the database; recreate UA Variant from binary
96
        try:
97
            async with self._db.execute(
98
                    f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
99
                    f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)) as cursor:
100
                async for row in cursor:
101
                    # rebuild the data value object
102
                    dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
103
                    dv.ServerTimestamp = row[1]
104
                    dv.SourceTimestamp = row[2]
105
                    dv.StatusCode = ua.StatusCode(row[3])
106
                    results.append(dv)
107
        except aiosqlite.Error as e:
108
            self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
109
        if nb_values:
110
            if len(results) > nb_values:
111
                cont = results[nb_values].SourceTimestamp
112
            results = results[:nb_values]
113
        return results, cont
114
115
    async def new_historized_event(self, source_id, evtypes, period, count=0):
116
        # get all fields for the event type nodes
117
        ev_fields = await self._get_event_fields(evtypes)
118
        self._datachanges_period[source_id] = period
119
        self._event_fields[source_id] = ev_fields
120
        table = self._get_table_name(source_id)
121
        columns = self._get_event_columns(ev_fields)
122
        # create a table for the event which will store fields generated by the source object's events
123
        # note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
124
        # properties with these names
125
        try:
126
            await self._db.execute(
127
                f'CREATE TABLE "{table}" '
128
                f'(_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {columns})',
129
                None
130
            )
131
            await self._db.commit()
132
        except aiosqlite.Error as e:
133
            self.logger.info("Historizing SQL Table Creation Error for events from %s: %s", source_id, e)
134
135
    async def save_event(self, event):
136
        table = self._get_table_name(event.SourceNode)
137
        columns, placeholders, evtup = self._format_event(event)
138
        event_type = event.EventType  # useful for troubleshooting database
139
        # insert the event into the database
140
        try:
141
            await self._db.execute(
142
                f'INSERT INTO "{table}" ("_Id", "_Timestamp", "_EventTypeName", {columns}) '
143
                f'VALUES (NULL, "{event.Time}", "{event_type}", {placeholders})',
144
                evtup
145
            )
146
            await self._db.commit()
147
        except aiosqlite.Error as e:
148
            self.logger.error("Historizing SQL Insert Error for events from %s: %s", event.SourceNode, e)
149
        # get this node's period from the period dict and calculate the limit
150
        period = self._datachanges_period[event.SourceNode]
151
        if period:
152
            # after the insert, if a period was specified delete all records older than period
153
            date_limit = datetime.utcnow() - period
154
            try:
155
                await self._db.execute(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
156
                await self._db.commit()
157
            except aiosqlite.Error as e:
158
                self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.SourceNode, e)
159
160
    async def read_event_history(self, source_id, start, end, nb_values, evfilter):
161
        table = self._get_table_name(source_id)
162
        start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
163
        clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
164
        cont = None
165
        cont_timestamps = []
166
        results = []
167
        # select events from the database; SQL select clause is built from EventFilter and available fields
168
        try:
169
            async with self._db.execute(
170
                    f'SELECT "_Timestamp", {clauses_str} FROM "{table}" '
171
                    f'WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {order} LIMIT ?',
172
                    (start_time, end_time, limit)) as cursor:
173
                async for row in cursor:
174
                    fdict = {}
175
                    cont_timestamps.append(row[0])
176
                    for i, field in enumerate(row[1:]):
177
                        if field is not None:
178
                            fdict[clauses[i]] = variant_from_binary(Buffer(field))
179
                        else:
180
                            fdict[clauses[i]] = ua.Variant(None)
181
                    results.append(Event.from_field_dict(fdict))
182
        except aiosqlite.Error as e:
183
            self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
184
        if nb_values:
185
            if len(results) > nb_values:  # start > ua.get_win_epoch() and
186
                cont = cont_timestamps[nb_values]
187
            results = results[:nb_values]
188
        return results, cont
189
190
    def _get_table_name(self, node_id):
191
        return f"{node_id.NamespaceIndex}_{node_id.Identifier}"
192
193
    async def _get_event_fields(self, evtypes):
194
        """
195
        Get all fields from the event types that are to be historized
196
        Args:
197
            evtypes: List of event type nodes
198
199
        Returns: List of fields for all event types
200
        """
201
        # get all fields from the event types that are to be historized
202
        ev_aggregate_fields = []
203
        for event_type in evtypes:
204
            ev_aggregate_fields.extend((await get_event_properties_from_type_node(event_type)))
205
        ev_fields = []
206
        for field in set(ev_aggregate_fields):
207
            ev_fields.append((await field.get_display_name()).Text)
208
        return ev_fields
209
210
    @staticmethod
211
    def _get_bounds(start, end, nb_values):
212
        order = "ASC"
213
        if start is None or start == ua.get_win_epoch():
214
            order = "DESC"
215
            start = ua.get_win_epoch()
216
        if end is None or end == ua.get_win_epoch():
217
            end = datetime.utcnow() + timedelta(days=1)
218
        if start < end:
219
            start_time = start.isoformat(" ")
220
            end_time = end.isoformat(" ")
221
        else:
222
            order = "DESC"
223
            start_time = end.isoformat(" ")
224
            end_time = start.isoformat(" ")
225
        if nb_values:
226
            limit = nb_values + 1  # add 1 to the number of values for retrieving a continuation point
227
        else:
228
            limit = -1  # in SQLite a LIMIT of -1 returns all results
229
        return start_time, end_time, order, limit
230
231
    def _format_event(self, event):
232
        """
233
        Convert an event object triggered by the subscription into ordered lists for the SQL insert string
234
235
        Args:
236
            event: The event returned by the subscription
237
238
        Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
239
        """
240
        placeholders = []
241
        ev_variant_binaries = []
242
        ev_variant_dict = event.get_event_props_as_fields_dict()
243
        names = list(ev_variant_dict.keys())
244
        names.sort()  # sort alphabetically since dict is not sorted
245
        # split dict into two synchronized lists which will be converted to SQL strings
246
        # note that the variants are converted to binary objects for storing in SQL BLOB format
247
        for name in names:
248
            variant = ev_variant_dict[name]
249
            placeholders.append("?")
250
            ev_variant_binaries.append(sqlite3.Binary(variant_to_binary(variant)))
251
        return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
252
253
    def _get_event_columns(self, ev_fields):
254
        fields = []
255
        for field in ev_fields:
256
            fields.append(field + " BLOB")
257
        return self._list_to_sql_str(fields, False)
258
259
    def _get_select_clauses(self, source_id, evfilter):
260
        s_clauses = []
261
        for select_clause in evfilter.SelectClauses:
262
            try:
263
                if not select_clause.BrowsePath:
264
                    s_clauses.append(select_clause.Attribute.name)
265
                else:
266
                    name = select_clause.BrowsePath[0].Name
267
                    s_clauses.append(name)
268
            except AttributeError:
269
                self.logger.warning("Historizing SQL OPC UA Select Clause Warning for node %s,"
270
                                    " Clause: %s:", source_id, select_clause)
271
        # remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
272
        clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
273
        return clauses, self._list_to_sql_str(clauses)
274
275
    @staticmethod
276
    def _list_to_sql_str(ls, quotes=True):
277
        items = [f'"{item}"' if quotes else str(item) for item in ls]
278
        return ", ".join(items)
279