Completed
Pull Request — master (#156)
by
unknown
02:37
created

opcua.server.HistoryManager.update_history()   A

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.5185
Metric Value
cc 2
dl 0
loc 13
ccs 1
cts 7
cp 0.1429
crap 4.5185
rs 9.4285
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6
7 1
from opcua.server.history_interface import HistoryStorageInterface
8
9
# if you want to use an SQL based history uncomment this import and change the storage type of the history manager
10
# from opcua.server.history_sql import HistorySQLite
11
12
13 1
class HistoryDict(HistoryStorageInterface):
14
    """
15
    very minimal history backend storing data in memory using a Python dictionary
16
    """
17 1
    def __init__(self):
18 1
        self._datachanges = {}
19 1
        self._datachanges_period = {}
20 1
        self._events = {}
21
22 1
    def new_historized_node(self, node, period, count=0):
23 1
        node_id = node.nodeid
24 1
        self._datachanges[node_id] = []
25 1
        self._datachanges_period[node_id] = period, count
26
27 1
    def save_node_value(self, node, datavalue):
28 1
        node_id = node.nodeid
29 1
        data = self._datachanges[node_id]
30 1
        period, count = self._datachanges_period[node_id]
31 1
        data.append(datavalue)
32 1
        now = datetime.now()
33 1
        if period:
34 1
            while now - data[0].ServerTimestamp > period:
35
                data.pop(0)
36 1
        if count and len(data) > count:
37 1
            data = data[-count:]
38
39 1
    def read_node_history(self, node, start, end, nb_values):
40 1
        node_id = node.NodeId
41 1
        cont = None
42 1
        if node_id not in self._datachanges:
43
            print("Error attempt to read history for a node which is not historized")
44
            return [], cont
45
        else:
46 1
            if end is None:
47
                end = datetime.now() + timedelta(days=1)
48 1
            if start is None:
49
                start = ua.DateTimeMinValue
50 1
            results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
51 1
            if nb_values:
52 1
                if start > ua.DateTimeMinValue and len(results) > nb_values:
53 1
                    cont = results[nb_values + 1].ServerTimestamp
54 1
                    results = results[:nb_values]
55
                else:
56 1
                    results = results[-nb_values:]
57 1
            return results, cont
58
59 1
    def new_historized_event(self, event, period):
60
        self._events = []
61
62 1
    def save_event(self, event):
63
        raise NotImplementedError
64
65 1
    def read_event_history(self, start, end, evfilter):
66
        raise NotImplementedError
67
68 1
    def stop(self):
69 1
        pass
70
71
72 1
class SubHandler(object):
73 1
    def __init__(self, storage):
74 1
        self.storage = storage
75
76 1
    def datachange_notification(self, node, val, data):
77 1
        self.storage.save_node_value(node, data.monitored_item.Value)
78
79 1
    def event_notification(self, event):
80
        self.storage.save_event(event)
81
82
83 1
class HistoryManager(object):
84 1
    def __init__(self, iserver):
85 1
        self.iserver = iserver
86 1
        self.storage = HistoryDict()  # Change to HistorySQLite() for file based history
87 1
        self._sub = None
88 1
        self._handlers = {}
89
90 1
    def set_storage(self, storage):
91
        self.storage = storage
92
93 1
    def _create_subscription(self, handler):
94 1
        params = ua.CreateSubscriptionParameters()
95 1
        params.RequestedPublishingInterval = 10
96 1
        params.RequestedLifetimeCount = 3000
97 1
        params.RequestedMaxKeepAliveCount = 10000
98 1
        params.MaxNotificationsPerPublish = 0
99 1
        params.PublishingEnabled = True
100 1
        params.Priority = 0
101 1
        return Subscription(self.iserver.isession, params, handler)
102
103 1
    def historize(self, node, period=timedelta(days=7), count=0):
104 1
        if not self._sub:
105 1
            self._sub = self._create_subscription(SubHandler(self.storage))
106 1
        if node in self._handlers:
107
            raise ua.UaError("Node {} is allready historized".format(node))
108 1
        self.storage.new_historized_node(node, period, count)
109 1
        handler = self._sub.subscribe_data_change(node)
110 1
        self._handlers[node] = handler
111
112 1
    def dehistorize(self, node):
113 1
        self._sub.unsubscribe(self._handlers[node])
114 1
        del(self._handlers[node])
115
116 1
    def read_history(self, params):
117
        """
118
        Read history for a node
119
        This is the part AttributeService, but implemented as its own service
120
        since it requires more logic than other attribute service methods
121
        """
122 1
        results = []
123
        
124 1
        for rv in params.NodesToRead:
125 1
            res = self._read_history(params.HistoryReadDetails, rv)
126 1
            results.append(res)
127 1
        return results
128
        
129 1
    def _read_history(self, details, rv):
130
        """
131
        read history for a node
132
        """
133 1
        result = ua.HistoryReadResult()
134 1
        if isinstance(details, ua.ReadRawModifiedDetails):
135 1
            if details.IsReadModified:
136
                result.HistoryData = ua.HistoryModifiedData()
137
                # we do not support modified history by design so we return what we have
138
            else:
139 1
                result.HistoryData = ua.HistoryData()
140 1
            dv, cont = self._read_datavalue_history(rv, details)
141 1
            result.HistoryData.DataValues = dv
142 1
            result.ContinuationPoint = cont
143
144
        elif isinstance(details, ua.ReadEventDetails):
145
            result.HistoryData = ua.HistoryEvent()
146
            # FIXME: filter is a cumbersome type, maybe transform it something easier
147
            # to handle for storage
148
            result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
149
                                                                        details.EndTime,
150
                                                                        details.Filter)
151
        else:
152
            # we do not currently support the other types, clients can process data themselves
153
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
154 1
        return result
155
156 1
    def _read_datavalue_history(self, rv, details):
157 1
        starttime = details.StartTime
158 1
        if rv.ContinuationPoint:
159
            # Spec says we should ignore details if cont point is present
160
            # but they also say we can use cont point as timestamp to enable stateless
161
            # implementation. This is contradictory, so we assume details is
162
            # send correctly with continuation point
163
            # starttime = bytes_to_datetime(rv.ContinuationPoint)
164
            starttime = ua.unpack_datetime(rv.ContinuationPoint)
165
166 1
        dv, cont = self.storage.read_node_history(rv,
167
                                                  starttime,
168
                                                  details.EndTime,
169
                                                  details.NumValuesPerNode)
170 1
        if cont:
171
            # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
172 1
            cont = ua.pack_datetime(dv[-1].ServerTimestamp)
173
        # FIXME, parse index range and filter out if necessary
174
        # rv.IndexRange
175
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
176 1
        return dv, cont
177
178 1
    def update_history(self, params):
179
        """
180
        Update history for a node
181
        This is the part AttributeService, but implemented as its own service
182
        since it requires more logic than other attribute service methods
183
        """
184
        results = []
185
        for _ in params.HistoryUpdateDetails:
186
            result = ua.HistoryUpdateResult()
187
            # we do not accept to rewrite history
188
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
189
            results.append(results)
190
        return results
191
192 1
    def stop(self):
193
        self.storage.stop()
194