Completed
Pull Request — master (#140)
by Olivier
02:30
created

opcua.server.HistoryDict   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 36
Duplicated Lines 0 %

Test Coverage

Coverage 72%
Metric Value
dl 0
loc 36
ccs 18
cts 25
cp 0.72
rs 10
wmc 11

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 4 1
A read_node_value() 0 6 4
A new_node() 0 3 1
A save_node_value() 0 7 2
A new_event() 0 2 1
A save_event() 0 2 1
A read_event() 0 2 1
1 1
from datetime import timedelta
2 1
from datetime import datetime
3
4 1
from opcua import Subscription
5 1
from opcua import ua
6
7
8 1
class HistoryStorageInterface(object):
9
    """
10
    Interface of a history backend
11
    """
12 1
    def save_node_value(self, node, timestamp, datavalue):
13
        raise NotImplementedError
14
15 1
    def read_node_value(self, node, start, end):
16
        raise NotImplementedError
17
18 1
    def save_event(self, timestamp, event):
19
        raise NotImplementedError
20
21 1
    def read_event(self, event, start, end):
22
        raise NotImplementedError
23
24
25 1
class HistoryDict(HistoryStorageInterface):
26
    """
27
    very minimal history backend storing data in memory using a Python dictionnary
28
    """
29 1
    def __init__(self):
30 1
        self._datachanges = {}
31 1
        self._datachanges_period = {}
32 1
        self._events = {}
33
34 1
    def new_node(self, node, period):
35 1
        self._datachanges[node] = []
36 1
        self._datachanges_period[node] = period
37
38 1
    def new_event(self, period):
39
        self._events = []
40
41 1
    def save_node_value(self, node, datavalue):
42 1
        data = self._datachanges[node]
43 1
        period = self._datachanges_period[node]
44 1
        data.append(datavalue)
45 1
        now = datetime.now()
46 1
        while now - data[0].ServerTimestamp > period:
47
            data.pop(0)
48
49 1
    def read_node_value(self, node, start, end):
50
        if node not in self._datachanges:
51
            return []
52
        else:
53
            # FIME: improve algo
54
            return [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end]
55
56 1
    def save_event(self, timestamp, event):
57
        raise NotImplementedError
58
59 1
    def read_event(self, event, start, end):
60
        raise NotImplementedError
61
62
63 1
class SubHandler(object):
64 1
    def __init__(self, storage):
65 1
        self.storage = storage
66
67 1
    def datachange_notification(self, node, val, data):
68 1
        print("Python: New data change event", node, val, data)
69 1
        self.storage.save_node_value(node, data.monitored_item.Value)
70
71 1
    def event_notification(self, event):
72
        print("Python: New event", event)
73
        self.storage.save_event(event)
74
75
76 1
class HistoryManager(object):
77 1
    def __init__(self, iserver):
78 1
        self.iserver = iserver
79 1
        self.storage = HistoryDict()
80 1
        self._sub = None
81 1
        self._handlers = {}
82
83 1
    def set_storage(self, storage):
84
        self.storage = storage
85
86 1
    def _create_subscription(self, handler):
87 1
        params = ua.CreateSubscriptionParameters()
88 1
        params.RequestedPublishingInterval = 10
89 1
        params.RequestedLifetimeCount = 3000
90 1
        params.RequestedMaxKeepAliveCount = 10000
91 1
        params.MaxNotificationsPerPublish = 0
92 1
        params.PublishingEnabled = True
93 1
        params.Priority = 0
94 1
        return Subscription(self.iserver.isession, params, handler)
95
96 1
    def historize(self, node, period=timedelta(days=7)):
97 1
        if not self._sub:
98 1
            self._sub = self._create_subscription(SubHandler(self.storage))
99 1
        if node in self._handlers:
100
            raise ua.UaError("Node {} is allready historized".format(node))
101 1
        self.storage.new_node(node, period)
102 1
        handler = self._sub.subscribe_data_change(node)
103 1
        self._handlers[node] = handler
104
105 1
    def dehistorize(self, node):
106 1
        self._sub.unsubscribe(self._handlers[node])
107 1
        del(self._handlers[node])
108
109 1
    def read_history(self, params):
110
        """
111
        Read history for a node
112
        This is the part AttributeService, but implemented as its own service
113
        since it requires more logic than other attribute service methods
114
        """
115
        results = []
116
        
117
        for rv in params.NodesToRead:
118
            res = self._read_history(params.HistoryReadDetails, rv)
119
            results.append(res)
120
        return results
121
        
122 1
    def _read_history(self, details, rv):
123
        if type(details) is ua.ReadRawModifiedDetails:
124
            pass
125
            
126
        self.storage.read_data()
127
128 1
    def update_history(self, params):
129
        """
130
        Update history for a node
131
        This is the part AttributeService, but implemented as its own service
132
        since it requires more logic than other attribute service methods
133
        """
134
        pass
135
136
137
138
139