Completed
Push — master ( 5a9df5...39e3a6 )
by Olivier
223:12 queued 212:11
created

opcua.common.Subscription._call_event()   D

Complexity

Conditions 8

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 8.7021
Metric Value
cc 8
dl 0
loc 24
ccs 14
cts 18
cp 0.7778
crap 8.7021
rs 4.3478
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
11
12 1
class SubHandler(object):
13
    """
14
    Subscription Handler. To receive events from server for a subscription
15
    This class is just a sample class. Whatever class having these methods can be used
16
    """
17
18 1
    def data_change(self, handle, node, val, attr):
19
        """
20
        Deprecated, use datachange_notification
21
        """
22
        pass
23
24 1
    def datachange_notification(self, node, val, data):
25
        """
26
        called for every datachange notfication from server
27
        """
28
        pass
29
30 1
    def event_notification(self, event):
31
        """
32
        called for every event notfication from server
33
        """
34
        pass
35
36 1
    def event(self, handle, event):
37
        """
38
        Deprecated use event_notification
39
        """
40
        pass
41
42
43 1
class EventResult():
44
    """
45
    To be sent to clients for every events from server
46
    """
47
48 1
    def __init__(self):
49 1
        self.server_handle = None
50
51 1
    def __str__(self):
52
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
53 1
    __repr__ = __str__
54
55
56 1
class SubscriptionItemData():
57
    """
58
    To store usefull data from a monitored item
59
    """
60 1
    def __init__(self):
61 1
        self.node = None
62 1
        self.client_handle = None
63 1
        self.server_handle = None
64 1
        self.attribute = None
65 1
        self.mfilter = None
66
67
68 1
class DataChangeNotif():
69
    """
70
    To be send to clients for every datachange notification from server
71
    """
72 1
    def __init__(self, subscription_data, monitored_item):
73 1
        self.monitored_item = monitored_item
74 1
        self.subscription_data = subscription_data
75
76 1
    def __str__(self):
77
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
78 1
    __repr__ = __str__
79
80
81 1
class Subscription(object):
82
    """
83
    Subscription object returned by Server or Client objects.
84
    The object represent a subscription to an opc-ua server.
85
    This is a high level class, especially subscribe_data_change 
86
    and subscribe_events methods. If more control is necessary look at
87
    code and/or use create_monitored_items method.
88
    """
89
90 1
    def __init__(self, server, params, handler):
91 1
        self.logger = logging.getLogger(__name__)
92 1
        self.server = server
93 1
        self._client_handle = 200
94 1
        self._handler = handler
95 1
        self.parameters = params  # move to data class
96 1
        self._monitoreditems_map = {}
97 1
        self._lock = Lock()
98 1
        self.subscription_id = None
99 1
        response = self.server.create_subscription(params, self.publish_callback)
100 1
        self.subscription_id = response.SubscriptionId  # move to data class
101 1
        self.server.publish()
102 1
        self.server.publish()
103
104 1
    def delete(self):
105
        """
106
        Delete subscription on server. This is automatically done by Client and Server classes on exit
107
        """
108 1
        results = self.server.delete_subscriptions([self.subscription_id])
109 1
        results[0].check()
110
111 1
    def publish_callback(self, publishresult):
112 1
        self.logger.info("Publish callback called with result: %s", publishresult)
113 1
        while self.subscription_id is None:
114 1
            time.sleep(0.01)
115
116 1
        for notif in publishresult.NotificationMessage.NotificationData:
117 1
            if isinstance(notif, ua.DataChangeNotification):
118 1
                self._call_datachange(notif)
119 1
            elif isinstance(notif, ua.EventNotificationList):
120 1
                self._call_event(notif)
121
            elif isinstance(notif, ua.StatusChangeNotification):
122
                self._call_status(notif)
123
            else:
124
                self.logger.warning("Notification type not supported yet for notification %s", notif)
125
126 1
        ack = ua.SubscriptionAcknowledgement()
127 1
        ack.SubscriptionId = self.subscription_id
128 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
129 1
        self.server.publish([ack])
130
131 1
    def _call_datachange(self, datachange):
132 1
        for item in datachange.MonitoredItems:
133 1
            with self._lock:
134 1
                if item.ClientHandle not in self._monitoreditems_map:
135 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
136 1
                    continue
137 1
                data = self._monitoreditems_map[item.ClientHandle]
138 1
            try:
139 1
                if hasattr(self._handler, "datachange_notification"):
140 1
                    event_data = DataChangeNotif(data, item)
141 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
142 1
                elif hasattr(self._handler, "data_change"):  # deprecated API
143 1
                    self.logger.warning("data_change method is deprecated, use datavalue_changed")
144 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
145
                else:
146
                    self.logger.error("DataChange subscription created but handler has no datachange_notification method")
147
            except Exception:
148
                self.logger.exception("Exception calling data change handler")
149
150 1
    def _call_event(self, eventlist):
151 1
        for event in eventlist.Events:
152 1
            with self._lock:
153 1
                data = self._monitoreditems_map[event.ClientHandle]
154 1
            try:
155
                #fields = {}
156 1
                result = EventResult()
157 1
                result.server_handle = data.server_handle
158 1
                for idx, sattr in enumerate(data.mfilter.SelectClauses):
159
160 1
                    if len(sattr.BrowsePath) == 0:
161
                        #fields[ua.AttributeIdsInv[sattr.AttributeId]] = event.EventFields[idx].Value
162
                        #setattr(result, ua.AttributeIdsInv[sattr.AttributeId], event.EventFields[idx].Value)
163
                        setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
164
                    else:
165 1
                        setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
166 1
                if hasattr(self._handler, "event_notification"):
167 1
                    self._handler.event_notification(result)
168 1
                elif hasattr(self._handler, "event"):  # depcrecated API
169 1
                    self._handler.event(data.server_handle, result)
170
                else:
171
                    self.logger.error("Event subscription created but handler has no event_notification method")
172
            except Exception:
173
                self.logger.exception("Exception calling event handler")
174
175 1
    def _call_status(self, status):
176
        try:
177
            self._handler.status_change(status.Status)
178
        except Exception:
179
            self.logger.exception("Exception calling status change handler")
180
181 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
182
        """
183
        Subscribe for data change events for a node or list of nodes.
184
        default attribute is Value.
185
        Return a handle which can be used to unsubscribe
186
        If more control is necessary use create_monitored_items method
187
        """
188 1
        return self._subscribe(nodes, attr, queuesize=1)
189
190 1
    def _get_node(self, nodeid):
191 1
        if isinstance(nodeid, ua.NodeId):
192
            node = Node(self.server, nodeid)
193 1
        elif isinstance(nodeid, Node):
194 1
            node = nodeid
195
        else:
196 1
            node = Node(self.server, ua.NodeId(nodeid))
197 1
        return node
198
199 1
    def _get_filter_from_event_type(self, eventtype):
200 1
        eventtype = self._get_node(eventtype)
201 1
        evfilter = ua.EventFilter()
202 1
        for desc in eventtype.get_children_descriptions(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable):
203 1
            op = ua.SimpleAttributeOperand()
204 1
            op.TypeDefinitionId = eventtype.nodeid
205 1
            op.AttributeId = ua.AttributeIds.Value
206 1
            op.BrowsePath = [desc.BrowseName]
207 1
            evfilter.SelectClauses.append(op)
208 1
        return evfilter
209
210 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType):
211
        """
212
        Subscribe to events from a node. Default node is Server node. 
213
        In most servers the server node is the only one you can subscribe to.
214
        Return a handle which can be used to unsubscribe
215
        """
216 1
        sourcenode = self._get_node(sourcenode)
217 1
        evfilter = self._get_filter_from_event_type(evtype)
218 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
219
220 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
221 1
        is_list = True
222 1
        if not type(nodes) in (list, tuple):
223 1
            is_list = False
224 1
            nodes = [nodes]
225 1
        mirs = []
226 1
        for node in nodes:
227 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
228 1
            mirs.append(mir)
229
230 1
        mids = self.create_monitored_items(mirs)
231 1
        if is_list:
232 1
            return mids
233 1
        if type(mids[0]) == ua.StatusCode:
234 1
            mids[0].check()
235 1
        return mids[0]
236
237 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
238 1
        rv = ua.ReadValueId()
239 1
        rv.NodeId = node.nodeid
240 1
        rv.AttributeId = attr
241
        # rv.IndexRange //We leave it null, then the entire array is returned
242 1
        mparams = ua.MonitoringParameters()
243 1
        with self._lock:
244 1
            self._client_handle += 1
245 1
            mparams.ClientHandle = self._client_handle
246 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
247 1
        mparams.QueueSize = queuesize
248 1
        mparams.DiscardOldest = True
249 1
        if mfilter:
250 1
            mparams.Filter = mfilter
251 1
        mir = ua.MonitoredItemCreateRequest()
252 1
        mir.ItemToMonitor = rv
253 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
254 1
        mir.RequestedParameters = mparams
255 1
        return mir
256
257 1
    def create_monitored_items(self, monitored_items):
258
        """
259
        low level method to have full control over subscription parameters
260
        Client handle must be unique since it will be used as key for internal registration of data
261
        """
262 1
        params = ua.CreateMonitoredItemsParameters()
263 1
        params.SubscriptionId = self.subscription_id
264 1
        params.ItemsToCreate = monitored_items
265 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
266
        
267 1
        mids = []
268 1
        results = self.server.create_monitored_items(params)
269
        # FIXME: Race condition here
270
        # We lock as early as possible. But in some situation, a notification may arrives before
271
        # locking and we will not be able to prosess it. To avoid issues, users should subscribe 
272
        # to all nodes at once
273 1
        with self._lock:  
274 1
            for idx, result in enumerate(results):
275 1
                mi = params.ItemsToCreate[idx]
276 1
                if not result.StatusCode.is_good():
277 1
                    mids.append(result.StatusCode)
278 1
                    continue
279 1
                data = SubscriptionItemData()
280 1
                data.client_handle = mi.RequestedParameters.ClientHandle
281 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
282 1
                data.attribute = mi.ItemToMonitor.AttributeId
283 1
                data.server_handle = result.MonitoredItemId
284
                #data.mfilter = result.FilterResult
285 1
                data.mfilter = mi.RequestedParameters.Filter
286 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
287
288 1
                mids.append(result.MonitoredItemId)
289 1
        return mids
290
291 1
    def unsubscribe(self, handle):
292
        """
293
        unsubscribe to datachange or events using the handle returned while subscribing
294
        if you delete subscription, you do not need to unsubscribe
295
        """
296 1
        params = ua.DeleteMonitoredItemsParameters()
297 1
        params.SubscriptionId = self.subscription_id
298 1
        params.MonitoredItemIds = [handle]
299 1
        results = self.server.delete_monitored_items(params)
300 1
        results[0].check()
301 1
        with self._lock:
302 1
            for k, v in self._monitoreditems_map.items():
303 1
                if v.server_handle == handle:
304 1
                    del(self._monitoreditems_map[k])
305 1
                    return
306
307