Completed
Pull Request — master (#156)
by
unknown
02:38
created

opcua.server.HistorySQLite.save_event()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
import logging
2
from datetime import timedelta
3
from datetime import datetime
4
5
from opcua import ua
6
from opcua.common.utils import Buffer
7
from opcua.server.history import HistoryStorageInterface
8
9
import sqlite3
10
11
12
class HistorySQLite(HistoryStorageInterface):
13
    """
14
    very minimal history backend storing data in SQLite database
15
    """
16
17
    def __init__(self):
18
        self.logger = logging.getLogger('historySQL')
19
        self._datachanges_period = {}
20
        self._events = {}
21
        self._db_file = "history.db"
22
23
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
24
25
    def new_historized_node(self, node_id, period, count=0):
26
        _c_new = self._conn.cursor()
27
28
        table = self._get_table_name(node_id)
29
30
        self._datachanges_period[node_id] = period
31
32
        # create a table for the node which will store attributes of the DataValue object
33
        # note: Value and VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
34
        try:
35
            _c_new.execute('CREATE TABLE "{tn}" (ServerTimestamp TIMESTAMP,'
36
                           ' SourceTimestamp TIMESTAMP,'
37
                           ' StatusCode INTEGER,'
38
                           ' Value TEXT,'
39
                           ' VariantType TEXT,'
40
                           ' VariantBinary BLOB)'.format(tn=table))
41
42
        except sqlite3.Error as e:
43
            self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
44
45
        self._conn.commit()
46
47
    def save_node_value(self, node_id, datavalue):
48
        _c_sub = self._conn.cursor()
49
50
        table = self._get_table_name(node_id)
51
52
        # insert the data change into the database
53
        try:
54
            _c_sub.execute('INSERT INTO "{tn}" VALUES (?, ?, ?, ?, ?, ?)'.format(tn=table), (datavalue.ServerTimestamp,
55
                                                                                             datavalue.SourceTimestamp,
56
                                                                                             datavalue.StatusCode.value,
57
                                                                                             str(datavalue.Value.Value),
58
                                                                                             datavalue.Value.VariantType.name,
59
                                                                                             datavalue.Value.to_binary()))
60
        except sqlite3.Error as e:
61
            self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
62
63
        self._conn.commit()
64
65
        # get this node's period from the period dict and calculate the limit
66
        period = self._datachanges_period[node_id]
67
        date_limit = datetime.now() - period
68
69
        # after the insert, delete all values older than period
70
        try:
71
            _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
72
                                                                                (date_limit.isoformat(' '),))
73
        except sqlite3.Error as e:
74
            self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
75
76
        self._conn.commit()
77
78
    def read_node_history(self, node_id, start, end, nb_values):
79
        _c_read = self._conn.cursor()
80
81
        if end is None:
82
            end = datetime.now() + timedelta(days=1)
83
        if start is None:
84
            start = ua.DateTimeMinValue
85
86
        table = self._get_table_name(node_id)
87
88
        cont = None
89
        results = []
90
91
        start_time = start.isoformat(' ')
92
        end_time = end.isoformat(' ')
93
94
        # select values from the database; recreate UA Variant from binary
95
        try:
96
            for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
97
                                       'LIMIT ?'.format(tn=table), (start_time, end_time, nb_values,)):
98
99
                dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[5])))
100
                dv.ServerTimestamp = row[0]
101
                dv.SourceTimestamp = row[1]
102
                dv.StatusCode = ua.StatusCode(row[2])
103
104
                results.append(dv)
105
106
        except sqlite3.Error as e:
107
            self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
108
109
        return results, cont
110
111
    def new_historized_event(self, event, period):
112
        raise NotImplementedError
113
114
    def save_event(self, event):
115
        raise NotImplementedError
116
117
    def read_event_history(self, start, end, evfilter):
118
        raise NotImplementedError
119
120
    def _get_table_name(self, node_id):
121
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
122
123
    # close connections to the history database when the server stops
124
    def stop(self):
125
        self._conn.close()
126