Completed
Pull Request — master (#140)
by Olivier
02:29
created

read_datavalue_history()   A

Complexity

Conditions 3

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 9.0292
Metric Value
cc 3
dl 0
loc 19
ccs 1
cts 8
cp 0.125
crap 9.0292
rs 9.4285
1 1
from datetime import timedelta
2 1
from datetime import datetime
3 1
import time
4 1
import struct
5
6 1
from opcua import Subscription
7 1
from opcua import ua
8
9
10 1
def datetime_to_bytes(dt):
11
    time_float = time.mktime(dt.timetuple()) + dt.microsecond / 1E6
12
    return struct.pack("!L", time_float)
13
14
15 1
def bytes_to_datetime(data):
16
    time_float = struct.unpack('!L', data)[0]
17
    return datetime.fromtimestamp(time_float)
18
19
20 1
class HistoryStorageInterface(object):
21
    """
22
    Interface of a history backend
23
    """
24 1
    def new_node(self, node, period, count=0):
25
        raise NotImplementedError
26
27 1
    def save_datavalue(self, node, datavalue):
28
        raise NotImplementedError
29
30 1
    def read_datavalues(self, node, start, end, nb_values):
31
        raise NotImplementedError
32
33 1
    def new_event(self, event, period):
34
        raise NotImplementedError
35
36 1
    def save_event(self, event):
37
        raise NotImplementedError
38
39 1
    def read_events(self, start, end, evfilter):
40
        raise NotImplementedError
41
42
43 1
class HistoryDict(HistoryStorageInterface):
44
    """
45
    very minimal history backend storing data in memory using a Python dictionnary
46
    """
47 1
    def __init__(self):
48 1
        self._datachanges = {}
49 1
        self._datachanges_period = {}
50 1
        self._events = {}
51
52 1
    def new_node(self, node, period, count=0):
53 1
        self._datachanges[node] = []
54 1
        self._datachanges_period[node] = period, count
55
56 1
    def new_event(self, event, period):
57
        self._events = []
58
59 1
    def save_datavalue(self, node, datavalue):
60 1
        print("saving", node, datavalue)
61 1
        data = self._datachanges[node]
62 1
        period, count = self._datachanges_period[node]
63 1
        data.append(datavalue)
64 1
        now = datetime.now()
65 1
        if period:
66 1
            while now - data[0].ServerTimestamp > period:
67
                data.pop(0)
68 1
        if count and len(data) > count:
69 1
            data = data[-count:]
70
71 1
    def read_datavalues(self, node, start, end, nb_values):
72
        if node not in self._datachanges:
73
            return []
74
        else:
75
            # FIME: improve algo
76
            return [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end]
77
78 1
    def save_event(self, timestamp, event):
79
        raise NotImplementedError
80
81 1
    def read_events(self, start, end, evfilter):
82
        raise NotImplementedError
83
84
85 1
class SubHandler(object):
86 1
    def __init__(self, storage):
87 1
        self.storage = storage
88
89 1
    def datachange_notification(self, node, val, data):
90 1
        self.storage.save_datavalue(node, data.monitored_item.Value)
91
92 1
    def event_notification(self, event):
93
        self.storage.save_event(event)
94
95
96 1
class HistoryManager(object):
97 1
    def __init__(self, iserver):
98 1
        self.iserver = iserver
99 1
        self.storage = HistoryDict()
100 1
        self._sub = None
101 1
        self._handlers = {}
102
103 1
    def set_storage(self, storage):
104
        self.storage = storage
105
106 1
    def _create_subscription(self, handler):
107 1
        params = ua.CreateSubscriptionParameters()
108 1
        params.RequestedPublishingInterval = 10
109 1
        params.RequestedLifetimeCount = 3000
110 1
        params.RequestedMaxKeepAliveCount = 10000
111 1
        params.MaxNotificationsPerPublish = 0
112 1
        params.PublishingEnabled = True
113 1
        params.Priority = 0
114 1
        return Subscription(self.iserver.isession, params, handler)
115
116 1
    def historize(self, node, period=timedelta(days=7), count=0):
117 1
        if not self._sub:
118 1
            self._sub = self._create_subscription(SubHandler(self.storage))
119 1
        if node in self._handlers:
120
            raise ua.UaError("Node {} is allready historized".format(node))
121 1
        self.storage.new_node(node, period, count)
122 1
        handler = self._sub.subscribe_data_change(node)
123 1
        self._handlers[node] = handler
124
125 1
    def dehistorize(self, node):
126 1
        self._sub.unsubscribe(self._handlers[node])
127 1
        del(self._handlers[node])
128
129 1
    def read_history(self, params):
130
        """
131
        Read history for a node
132
        This is the part AttributeService, but implemented as its own service
133
        since it requires more logic than other attribute service methods
134
        """
135
        results = []
136
        
137
        for rv in params.NodesToRead:
138
            res = self._read_history(params.HistoryReadDetails, rv)
139
            results.append(res)
140
        return results
141
        
142 1
    def _read_history(self, details, rv):
143
        """ read history for a node 
144
        """
145
        result = ua.HistoryReadResult()
146
        if type(details) is ua.ReadRawModifiedDetails:
147
            if details.IsReadModified:
148
                result.HistoryData = ua.HistoryModifiedData()
149
                # we do not support modified history by design so we return what we have
150
                dv, cont = self._read_datavalue_history(rv, details)
151
                result.HistoryData.DataValues = dv
152
                result.ContinuationPoint = cont
153
            else:
154
                result.HistoryData = ua.HistoryData()
155
                dv, cont = self._read_datavalue_history(rv, details)
156
                result.HistoryData.DataValues = dv
157
                result.ContinuationPoint = cont
158
159
        elif type(details) is ua.ReadEventDetails:
160
            result.HistoryData = ua.HistoryEvent()
161
            # FIXME: filter is a cumbersome type, maybe transform it something easier
162
            # to handle for storage
163
            result.HistoryData.Events = self.storage.read_events(details.StartTime,
164
                                                                 details.EndTime,
165
                                                                 details.Filter)
166
        else:
167
            # we do not currently support the other types, clients can process data themselves
168
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
169
        return result
170
171 1
    def read_datavalue_history(self, rv, details):
172
        starttime = details.StartTime
173
        if rv.ContinuationPoint:
174
            # Spec says we should ignore details if cont point is present
175
            # but they also say we can use cont point as timestamp to enable stateless
176
            # implementation. This is contradictory, so we assume details is
177
            # send correctly with continuation point
178
            starttime = bytes_to_datetime(rv.ContinuationPoint)
179
180
        dv, cont = self.storage.read_datavalues(rv.NodeId,
181
                                                starttime,
182
                                                details.EndTime,
183
                                                details.NumValuesPerNode)
184
        if cont:
185
            cont = datetime_to_bytes(dv[-1].SourceTimeStamp)
186
        # FIXME, parse index range and filter out if necesary
187
        # rv.IndexRange
188
        # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
189
        return dv, cont
190
191 1
    def update_history(self, params):
192
        """
193
        Update history for a node
194
        This is the part AttributeService, but implemented as its own service
195
        since it requires more logic than other attribute service methods
196
        """
197
        results = []
198
        for details in params.HistoryUpdateDetails:
199
            result = ua.HistoryUpdateResult()
200
            # we do not accept to rewrite history
201
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
202
            results.append(results)
203
        return results
204
205
206
207
208