Completed
Push — master ( 92e9e1...c0b22b )
by Olivier
02:22
created

opcua.DataChangeNotif.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1
Metric Value
dl 0
loc 3
ccs 3
cts 3
cp 1
rs 10
cc 1
crap 1
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10 1
from opcua import ObjectIds
11 1
from opcua import AttributeIds
12
#from opcua import Event
13
14
15 1
class SubHandler(object):
16
    """
17
    Subscription Handler. To receive events from server for a subscription
18
    This class is just a sample class. Whatever class having these methods can be used
19
    """
20
21 1
    def data_change(self, handle, node, val, attr):
22
        """
23
        Deprecated, use datachange_notification
24
        """
25
        pass
26
27 1
    def datachange_notification(self, node, val, data):
28
        """
29
        called for every datachange notfication from server
30
        """
31
        pass
32
33 1
    def event_notification(self, event):
34
        """
35
        called for every event notfication from server
36
        """
37
        pass
38
39 1
    def event(self, handle, event):
40
        """
41
        Deprecated use event_notification
42
        """
43
        pass
44
45
46 1
class EventResult():
47
    """
48
    To be sent to clients for every events from server
49
    """
50
51 1
    def __init__(self):
52 1
        self.server_handle = None
53
54 1
    def __str__(self):
55
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
56 1
    __repr__ = __str__
57
58
59 1
class SubscriptionItemData():
60
    """
61
    To store usefull data from a monitored item
62
    """
63 1
    def __init__(self):
64 1
        self.node = None
65 1
        self.client_handle = None
66 1
        self.server_handle = None
67 1
        self.attribute = None
68 1
        self.mfilter = None
69
70
71 1
class DataChangeNotif():
72
    """
73
    To be send to clients for every datachange notification from server
74
    """
75 1
    def __init__(self, subscription_data, monitored_item):
76 1
        self.monitored_item = monitored_item
77 1
        self.subscription_data = subscription_data
78
79 1
    def __str__(self):
80
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
81 1
    __repr__ = __str__
82
83
84 1
class Subscription(object):
85
    """
86
    Subscription object returned by Server or Client objects.
87
    The object represent a subscription to an opc-ua server.
88
    This is a high level class, especially subscribe_data_change 
89
    and subscribe_events methods. If more control is necessary look at
90
    code and/or use create_monitored_items method.
91
    """
92
93 1
    def __init__(self, server, params, handler):
94 1
        self.logger = logging.getLogger(__name__)
95 1
        self.server = server
96 1
        self._client_handle = 200
97 1
        self._handler = handler
98 1
        self.parameters = params  # move to data class
99 1
        self._monitoreditems_map = {}
100 1
        self._lock = Lock()
101 1
        self.subscription_id = None
102 1
        response = self.server.create_subscription(params, self.publish_callback)
103 1
        self.subscription_id = response.SubscriptionId  # move to data class
104 1
        self.server.publish()
105 1
        self.server.publish()
106
107 1
    def delete(self):
108
        """
109
        Delete subscription on server. This is automatically done by Client and Server classes on exit
110
        """
111 1
        results = self.server.delete_subscriptions([self.subscription_id])
112 1
        results[0].check()
113
114 1
    def publish_callback(self, publishresult):
115 1
        self.logger.info("Publish callback called with result: %s", publishresult)
116 1
        while self.subscription_id is None:
117
            time.sleep(0.01)
118
119 1
        for notif in publishresult.NotificationMessage.NotificationData:
120 1
            if isinstance(notif, ua.DataChangeNotification):
121 1
                self._call_datachange(notif)
122 1
            elif isinstance(notif, ua.EventNotificationList):
123 1
                self._call_event(notif)
124
            elif isinstance(notif, ua.StatusChangeNotification):
125
                self._call_status(notif)
126
            else:
127
                self.logger.warning("Notification type not supported yet for notification %s", notif)
128
129 1
        ack = ua.SubscriptionAcknowledgement()
130 1
        ack.SubscriptionId = self.subscription_id
131 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
132 1
        self.server.publish([ack])
133
134 1
    def _call_datachange(self, datachange):
135 1
        for item in datachange.MonitoredItems:
136 1
            with self._lock:
137 1
                if item.ClientHandle not in self._monitoreditems_map:
138 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
139 1
                    continue
140 1
                data = self._monitoreditems_map[item.ClientHandle]
141 1
            try:
142 1
                if hasattr(self._handler, "datachange_notification"):
143 1
                    event_data = DataChangeNotif(data, item)
144 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
145 1
                elif hasattr(self._handler, "data_change"):  # deprecated API
146 1
                    self.logger.warning("data_change method is deprecated, use datavalue_changed")
147 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
148
                else:
149
                    self.logger.error("DataChange subscription created but handler has no datachange_notification method")
150
            except Exception:
151
                self.logger.exception("Exception calling data change handler")
152
153 1
    def _call_event(self, eventlist):
154 1
        for event in eventlist.Events:
155 1
            with self._lock:
156 1
                data = self._monitoreditems_map[event.ClientHandle]
157 1
            try:
158
                #fields = {}
159 1
                result = EventResult()
160 1
                result.server_handle = data.server_handle
161 1
                for idx, sattr in enumerate(data.mfilter.SelectClauses):
162
163 1
                    if len(sattr.BrowsePath) == 0:
164
                        #fields[ua.AttributeIdsInv[sattr.AttributeId]] = event.EventFields[idx].Value
165
                        setattr(result, ua.AttributeIdsInv[sattr.AttributeId], event.EventFields[idx].Value)
166
                    else:
167 1
                        setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
168 1
                if hasattr(self._handler, "event_notification"):
169 1
                    self._handler.event_notification(result)
170 1
                elif hasattr(self._handler, "event"):  # depcrecated API
171 1
                    self._handler.event(data.server_handle, result)
172
                else:
173
                    self.logger.error("Event subscription created but handler has no event_notification method")
174
            except Exception:
175
                self.logger.exception("Exception calling event handler")
176
177 1
    def _call_status(self, status):
178
        try:
179
            self._handler.status_change(status.Status)
180
        except Exception:
181
            self.logger.exception("Exception calling status change handler")
182
183 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
184
        """
185
        Subscribe for data change events for a node or list of nodes.
186
        default attribute is Value.
187
        Return a handle which can be used to unsubscribe
188
        If more control is necessary use create_monitored_items method
189
        """
190 1
        return self._subscribe(nodes, attr, queuesize=1)
191
192 1
    def _get_node(self, nodeid):
193 1
        if isinstance(nodeid, ua.NodeId):
194
            node = Node(self.server, nodeid)
195 1
        elif isinstance(nodeid, Node):
196 1
            node = nodeid
197
        else:
198 1
            node = Node(self.server, ua.NodeId(nodeid))
199 1
        return node
200
201 1
    def _get_filter_from_event_type(self, eventtype):
202 1
        eventtype = self._get_node(eventtype)
203 1
        evfilter = ua.EventFilter()
204 1
        for desc in eventtype.get_children_descriptions(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable):
205 1
            op = ua.SimpleAttributeOperand()
206 1
            op.TypeDefinitionId = eventtype.nodeid
207 1
            op.AttributeId = AttributeIds.Value
208 1
            op.BrowsePath = [desc.BrowseName]
209 1
            evfilter.SelectClauses.append(op)
210 1
        return evfilter
211
212 1
    def subscribe_events(self, sourcenode=ObjectIds.Server, evtype=ObjectIds.BaseEventType):
213
        """
214
        Subscribe to events from a node. Default node is Server node. 
215
        In most servers the server node is the only one you can subscribe to.
216
        Return a handle which can be used to unsubscribe
217
        """
218 1
        sourcenode = self._get_node(sourcenode)
219 1
        evfilter = self._get_filter_from_event_type(evtype)
220 1
        return self._subscribe(sourcenode, AttributeIds.EventNotifier, evfilter)
221
222 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
223 1
        is_list = True
224 1
        if not type(nodes) in (list, tuple):
225 1
            is_list = False
226 1
            nodes = [nodes]
227 1
        mirs = []
228 1
        for node in nodes:
229 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
230 1
            mirs.append(mir)
231
232 1
        mids = self.create_monitored_items(mirs)
233 1
        if is_list:
234 1
            return mids
235 1
        if type(mids[0]) == ua.StatusCode:
236 1
            mids[0].check()
237 1
        return mids[0]
238
239 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
240 1
        rv = ua.ReadValueId()
241 1
        rv.NodeId = node.nodeid
242 1
        rv.AttributeId = attr
243
        # rv.IndexRange //We leave it null, then the entire array is returned
244 1
        mparams = ua.MonitoringParameters()
245 1
        with self._lock:
246 1
            self._client_handle += 1
247 1
            mparams.ClientHandle = self._client_handle
248 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
249 1
        mparams.QueueSize = queuesize
250 1
        mparams.DiscardOldest = True
251 1
        if mfilter:
252 1
            mparams.Filter = mfilter
253 1
        mir = ua.MonitoredItemCreateRequest()
254 1
        mir.ItemToMonitor = rv
255 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
256 1
        mir.RequestedParameters = mparams
257 1
        return mir
258
259 1
    def create_monitored_items(self, monitored_items):
260
        """
261
        low level method to have full control over subscription parameters
262
        Client handle must be unique since it will be used as key for internal registration of data
263
        """
264 1
        params = ua.CreateMonitoredItemsParameters()
265 1
        params.SubscriptionId = self.subscription_id
266 1
        params.ItemsToCreate = monitored_items
267 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
268
        
269 1
        mids = []
270 1
        results = self.server.create_monitored_items(params)
271
        # FIXME: Race condition here
272
        # We lock as early as possible. But in some situation, a notification may arrives before
273
        # locking and we will not be able to prosess it. To avoid issues, users should subscribe 
274
        # to all nodes at once
275 1
        with self._lock:  
276 1
            for idx, result in enumerate(results):
277 1
                mi = params.ItemsToCreate[idx]
278 1
                if not result.StatusCode.is_good():
279 1
                    mids.append(result.StatusCode)
280 1
                    continue
281 1
                data = SubscriptionItemData()
282 1
                data.client_handle = mi.RequestedParameters.ClientHandle
283 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
284 1
                data.attribute = mi.ItemToMonitor.AttributeId
285 1
                data.server_handle = result.MonitoredItemId
286
                #data.mfilter = result.FilterResult
287 1
                data.mfilter = mi.RequestedParameters.Filter
288 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
289
290 1
                mids.append(result.MonitoredItemId)
291 1
        return mids
292
293 1
    def unsubscribe(self, handle):
294
        """
295
        unsubscribe to datachange or events using the handle returned while subscribing
296
        if you delete subscription, you do not need to unsubscribe
297
        """
298 1
        params = ua.DeleteMonitoredItemsParameters()
299 1
        params.SubscriptionId = self.subscription_id
300 1
        params.MonitoredItemIds = [handle]
301 1
        results = self.server.delete_monitored_items(params)
302 1
        results[0].check()
303 1
        with self._lock:
304 1
            for k, v in self._monitoreditems_map.items():
305 1
                if v.server_handle == handle:
306 1
                    del(self._monitoreditems_map[k])
307 1
                    return
308
309