Completed
Pull Request — master (#156)
by
unknown
02:35
created

opcua.server.HistorySQLite.new_historized_event()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
import logging
2
from datetime import timedelta
3
from datetime import datetime
4
5
from opcua import ua
6
from opcua.server.history import HistoryStorageInterface
7
8
import struct
9
import sqlite3
10
11
12
class HistorySQLite(HistoryStorageInterface):
13
    """
14
    very minimal history backend storing data in SQLite database
15
    """
16
17
    def __init__(self):
18
        self.logger = logging.getLogger('historySQL')
19
        self._datachanges_period = {}
20
        self._events = {}
21
        self._db_file = "history.db"
22
23
        self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
24
25
    def new_historized_node(self, node_id, period, count=0):
26
        _c_new = self._conn.cursor()
27
28
        table = self._get_table_name(node_id)
29
30
        self._datachanges_period[node_id] = period
31
32
        # create a table for the node which will store attributes of the DataValue object
33
        try:
34
            _c_new.execute('CREATE TABLE "{tn}" (ServerTimestamp TIMESTAMP,'
35
                           ' SourceTimestamp TIMESTAMP,'
36
                           ' StatusCode INTEGER,'
37
                           ' Value TEXT,'
38
                           ' VariantType INTEGER,'
39
                           ' ValueBinary BLOB)'.format(tn=table))
40
41
        except sqlite3.Error as e:
42
            self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
43
44
        self._conn.commit()
45
46
    def save_node_value(self, node_id, datavalue):
47
        _c_sub = self._conn.cursor()
48
49
        table = self._get_table_name(node_id)
50
51
        value_blob = self._pack_value(datavalue)
52
53
        # insert the data change into the database
54
        try:
55
            _c_sub.execute('INSERT INTO "{tn}" VALUES (?, ?, ?, ?, ?, ?)'.format(tn=table), (datavalue.ServerTimestamp,
56
                                                                                             datavalue.SourceTimestamp,
57
                                                                                             datavalue.StatusCode.value,
58
                                                                                             str(datavalue.Value.Value),
59
                                                                                             datavalue.Value.VariantType.value,
60
                                                                                             value_blob))
61
        except sqlite3.Error as e:
62
            self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
63
64
        self._conn.commit()
65
66
        # get this node's period from the period dict and calculate the limit
67
        period = self._datachanges_period[node_id]
68
        date_limit = datetime.now() - period
69
70
        # after the insert, delete all values older than period
71
        try:
72
            _c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
73
                                                                                (date_limit.isoformat(' '),))
74
        except sqlite3.Error as e:
75
            self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
76
77
        self._conn.commit()
78
79
    def read_node_history(self, node_id, start, end, nb_values):
80
        _c_read = self._conn.cursor()
81
82
        if end is None:
83
            end = datetime.now() + timedelta(days=1)
84
        if start is None:
85
            start = ua.DateTimeMinValue
86
87
        table = self._get_table_name(node_id)
88
89
        cont = None
90
        results = []
91
92
        start_time = start.isoformat(' ')
93
        end_time = end.isoformat(' ')
94
95
        # select values from the database
96
        try:
97
            for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
98
                                       'LIMIT ?'.format(tn=table), (start_time, end_time, nb_values,)):
99
100
                variant_type = ua.VariantType(row[4])
101
                value = self._unpack_value(variant_type, row[5])
102
103
                dv = ua.DataValue(ua.Variant(value, variant_type))
104
                dv.ServerTimestamp = row[0]
105
                dv.SourceTimestamp = row[1]
106
                dv.StatusCode = ua.StatusCode(row[2])
107
108
                results.append(dv)
109
110
        except sqlite3.Error as e:
111
            self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
112
113
        return results, cont
114
115
    def new_historized_event(self, event, period):
116
        raise NotImplementedError
117
118
    def save_event(self, event):
119
        raise NotImplementedError
120
121
    def read_event_history(self, start, end, evfilter):
122
        raise NotImplementedError
123
124
    def _pack_value(self, datavalue):
125
        return struct.pack(self._get_pack_type(datavalue.Value.VariantType), datavalue.Value.Value)
126
127
    def _unpack_value(self, variant_type, binary,):
128
        values_tuple = struct.unpack(self._get_pack_type(variant_type), binary)
129
        return values_tuple[0]
130
131
    def _get_pack_type(self, variant_type):
132
        # see object_ids.py
133
        if variant_type is ua.VariantType.Boolean:
134
            return '?'
135
        elif variant_type is ua.VariantType.SByte:  # Char (string with length of one in python)
136
            return 'c'
137
        elif variant_type is ua.VariantType.Byte:  # Byte (Signed Char in python)
138
            return 'b'
139
        if variant_type is ua.VariantType.Int16:
140
            return 'h'
141
        elif variant_type is ua.VariantType.UInt16:
142
            return 'H'
143
        elif variant_type is ua.VariantType.Int32:
144
            return 'i'
145
        elif variant_type is ua.VariantType.UInt32:
146
            return 'I'
147
        elif variant_type is ua.VariantType.Int64:
148
            return 'q'
149
        elif variant_type is ua.VariantType.UInt64:
150
            return 'Q'
151
        elif variant_type is ua.VariantType.Float:
152
            return 'f'
153
        elif variant_type is ua.VariantType.Double:
154
            return 'd'
155
        elif variant_type is ua.VariantType.String:
156
            return "s"
157
        else:
158
            # FIXME: Should raise exception here that historizing of node type isn't supported in SQL storage interface
159
            return None
160
161
    def _get_table_name(self, node_id):
162
        return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
163
164
    # close connections to the history database when the server stops
165
    def stop(self):
166
        self._conn.close()
167