Completed
Push — master ( 4652a3...eeb0ed )
by Olivier
03:54
created

Subscription.unsubscribe()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 4

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
c 1
b 0
f 0
dl 0
loc 15
ccs 7
cts 7
cp 1
crap 4
rs 9.2
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua.common import events
10
from opcua import Node
11
12 1
13
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18 1
19
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24 1
25
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notification from server
28
        """
29
        pass
30 1
31
    def event_notification(self, event):
32
        """
33
        called for every event notification from server
34
        """
35
        pass
36 1
37
    def status_change_notification(self, status):
38
        """
39
        called for every status change notification from server
40
        """
41
        pass
42
43 1
44
class SubscriptionItemData(object):
45
    """
46
    To store useful data from a monitored item
47
    """
48 1
    def __init__(self):
49 1
        self.node = None
50
        self.client_handle = None
51 1
        self.server_handle = None
52
        self.attribute = None
53 1
        self.mfilter = None
54
55 1
56
class DataChangeNotif(object):
57
    """
58
    To be send to clients for every datachange notification from server
59 1
    """
60 1
    def __init__(self, subscription_data, monitored_item):
61 1
        self.monitored_item = monitored_item
62 1
        self.subscription_data = subscription_data
63 1
64
    def __str__(self):
65
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
66 1
    __repr__ = __str__
67
68
69
class Subscription(object):
70 1
    """
71 1
    Subscription object returned by Server or Client objects.
72 1
    The object represent a subscription to an opc-ua server.
73 1
    This is a high level class, especially subscribe_data_change
74 1
    and subscribe_events methods. If more control is necessary look at
75 1
    code and/or use create_monitored_items method.
76
    """
77
78 1
    def __init__(self, server, params, handler):
79
        self.logger = logging.getLogger(__name__)
80
        self.server = server
81
        self._client_handle = 200
82 1
        self._handler = handler
83 1
        self.parameters = params  # move to data class
84 1
        self._monitoreditems_map = {}
85
        self._lock = Lock()
86 1
        self.subscription_id = None
87
        response = self.server.create_subscription(params, self.publish_callback)
88 1
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.server.publish()
90
        self.server.publish()
91 1
92
    def delete(self):
93
        """
94
        Delete subscription on server. This is automatically done by Client and Server classes on exit
95
        """
96
        results = self.server.delete_subscriptions([self.subscription_id])
97
        results[0].check()
98
99
    def publish_callback(self, publishresult):
100 1
        self.logger.info("Publish callback called with result: %s", publishresult)
101 1
        while self.subscription_id is None:
102 1
            time.sleep(0.01)
103 1
104 1
        for notif in publishresult.NotificationMessage.NotificationData:
105 1
            if isinstance(notif, ua.DataChangeNotification):
106 1
                self._call_datachange(notif)
107 1
            elif isinstance(notif, ua.EventNotificationList):
108 1
                self._call_event(notif)
109 1
            elif isinstance(notif, ua.StatusChangeNotification):
110 1
                self._call_status(notif)
111 1
            else:
112 1
                self.logger.warning("Notification type not supported yet for notification %s", notif)
113
114 1
        ack = ua.SubscriptionAcknowledgement()
115
        ack.SubscriptionId = self.subscription_id
116
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
117
        self.server.publish([ack])
118 1
119 1
    def _call_datachange(self, datachange):
120
        for item in datachange.MonitoredItems:
121 1
            with self._lock:
122 1
                if item.ClientHandle not in self._monitoreditems_map:
123 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
124 1
                    continue
125
                data = self._monitoreditems_map[item.ClientHandle]
126 1
            if hasattr(self._handler, "datachange_notification"):
127 1
                event_data = DataChangeNotif(data, item)
128 1
                try:
129 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
130 1
                except Exception:
131
                    self.logger.exception("Exception calling data change handler")
132
            elif hasattr(self._handler, "data_change"):  # deprecated API
133
                self.logger.warning("data_change method is deprecated, use datachange_notification")
134
                try:
135
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
136 1
                except Exception:
137 1
                    self.logger.exception("Exception calling deprecated data change handler")
138 1
            else:
139 1
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
140
141 1
    def _call_event(self, eventlist):
142 1
        for event in eventlist.Events:
143 1
            with self._lock:
144 1
                data = self._monitoreditems_map[event.ClientHandle]
145
            result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
146
            result.server_handle = data.server_handle
147 1
            if hasattr(self._handler, "event_notification"):
148 1
                try:
149 1
                    self._handler.event_notification(result)
150 1
                except Exception:
151 1
                    self.logger.exception("Exception calling event handler")
152
            elif hasattr(self._handler, "event"):  # depcrecated API
153
                try:
154 1
                    self._handler.event(data.server_handle, result)
155 1
                except Exception:
156 1
                    self.logger.exception("Exception calling deprecated event handler")
157 1
            else:
158
                self.logger.error("Event subscription created but handler has no event_notification method")
159
160
    def _call_status(self, status):
161
        try:
162
            self._handler.status_change_notification(status.Status)
163 1
        except Exception:
164 1
            self.logger.exception("Exception calling status change handler")
165 1
166 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
167 1
        """
168 1
        Subscribe for data change events for a node or list of nodes.
169 1
        default attribute is Value.
170 1
        Return a handle which can be used to unsubscribe
171
        If more control is necessary use create_monitored_items method
172
        """
173 1
        return self._subscribe(nodes, attr, queuesize=0)
174 1
175 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
176 1
        """
177
        Subscribe to events from a node. Default node is Server node.
178
        In most servers the server node is the only one you can subscribe to.
179
        if evtypes is not provided, evtype defaults to BaseEventType
180
        if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
181
        Return a handle which can be used to unsubscribe
182
        """
183
        sourcenode = Node(self.server, sourcenode)
184
185
        if evfilter is None:
186
            if not type(evtypes) in (list, tuple):
187 1
                evtypes = [evtypes]
188
189
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
190
191
            evfilter = events.get_filter_from_event_type(evtypes)
192
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
193 1
194
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
195
        is_list = True
196
        if not type(nodes) in (list, tuple):
197
            is_list = False
198
            nodes = [nodes]
199
        mirs = []
200 1
        for node in nodes:
201
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
202 1
            mirs.append(mir)
203 1
204
        mids = self.create_monitored_items(mirs)
205 1
        if is_list:
206 1
            return mids
207
        if type(mids[0]) == ua.StatusCode:
208 1
            mids[0].check()
209 1
        return mids[0]
210
211 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
212 1
        rv = ua.ReadValueId()
213 1
        rv.NodeId = node.nodeid
214 1
        rv.AttributeId = attr
215 1
        # rv.IndexRange //We leave it null, then the entire array is returned
216 1
        mparams = ua.MonitoringParameters()
217 1
        with self._lock:
218 1
            self._client_handle += 1
219 1
            mparams.ClientHandle = self._client_handle
220 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
221
        mparams.QueueSize = queuesize
222 1
        mparams.DiscardOldest = True
223
        if mfilter:
224
            mparams.Filter = mfilter
225
        mir = ua.MonitoredItemCreateRequest()
226
        mir.ItemToMonitor = rv
227
        mir.MonitoringMode = ua.MonitoringMode.Reporting
228 1
        mir.RequestedParameters = mparams
229 1
        return mir
230 1
231
    def create_monitored_items(self, monitored_items):
232 1
        """
233 1
        low level method to have full control over subscription parameters
234 1
        Client handle must be unique since it will be used as key for internal registration of data
235 1
        """
236 1
        params = ua.CreateMonitoredItemsParameters()
237 1
        params.SubscriptionId = self.subscription_id
238 1
        params.ItemsToCreate = monitored_items
239 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
240 1
241
        # insert monitored item into map to avoid notification arrive before result return
242 1
        # server_handle is left as None in purpose as we don't get it yet.
243 1
        with self._lock:
244 1
            for mi in monitored_items:
245 1
                data = SubscriptionItemData()
246 1
                data.client_handle = mi.RequestedParameters.ClientHandle
247 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
248
                data.attribute = mi.ItemToMonitor.AttributeId
249 1
                #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
250 1
                data.mfilter = mi.RequestedParameters.Filter
251 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
252 1
        results = self.server.create_monitored_items(params)
253
        mids = []
254 1
        # process result, add server_handle, or remove it if failed
255 1
        with self._lock:
256 1
            for idx, result in enumerate(results):
257 1
                mi = params.ItemsToCreate[idx]
258 1
                if not result.StatusCode.is_good():
259 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
260 1
                    mids.append(result.StatusCode)
261 1
                    continue
262 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
263 1
                data.server_handle = result.MonitoredItemId
264 1
                mids.append(result.MonitoredItemId)
265 1
        return mids
266 1
267 1
    def unsubscribe(self, handle):
268
        """
269 1
        unsubscribe to datachange or events using the handle returned while subscribing
270
        if you delete subscription, you do not need to unsubscribe
271
        """
272
        params = ua.DeleteMonitoredItemsParameters()
273
        params.SubscriptionId = self.subscription_id
274 1
        params.MonitoredItemIds = [handle]
275 1
        results = self.server.delete_monitored_items(params)
276 1
        results[0].check()
277 1
        with self._lock:
278
            for k, v in self._monitoreditems_map.items():
279
                if v.server_handle == handle:
280
                    del(self._monitoreditems_map[k])
281 1
                    return
282 1
283 1
    def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
284 1
        """
285 1
        Modify a monitored item.
286 1
        :param handle: Handle returned when originally subscribing
287 1
        :param new_samp_time: New wanted sample time
288 1
        :param new_queuesize: New wanted queuesize, default is 0
289 1
        :param mod_filter_val: New deadband filter value
290 1
        :return: Return a Modify Monitored Item Result
291
        """
292 1
        for monitored_item_index in self._monitoreditems_map:
293 1
            if self._monitoreditems_map[monitored_item_index].server_handle == handle:
294 1
                item_to_change = self._monitoreditems_map[monitored_item_index]
295 1
                break
296 1
        if mod_filter_val == None:
297 1
            mod_filter = None
298 1
        elif mod_filter_val < 0:
299 1
            mod_filter = item_to_change.mfilter
300 1
        else:
301 1
            mod_filter = ua.DataChangeFilter()
302 1
            mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
303
            mod_filter.DeadbandType = 1
304 1
            mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
305
        modif_item = ua.MonitoredItemModifyRequest()
306
        modif_item.MonitoredItemId = handle
307
        modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
308
                                                                             mod_filter)
309 1
        params = ua.ModifyMonitoredItemsParameters()
310 1
        params.SubscriptionId = self.subscription_id
311 1
        params.ItemsToModify.append(modif_item)
312 1
        results = self.server.modify_monitored_items(params)
313 1
        item_to_change.mfilter = results[0].FilterResult
314 1
        return results
315 1
316 1
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter):
317 1
        req_params = ua.MonitoringParameters()
318 1
        with self._lock:
319
            req_params.ClientHandle = self._client_handle
320
        req_params.QueueSize = new_queuesize
321 1
        req_params.Filter = mod_filter
322 1
        req_params.SamplingInterval = new_samp_time
323 1
        return req_params
324
325 1
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
326 1
        """
327
        Method to create a subscription with a Deadband Value.
328 1
        Default deadband value type is absolute.
329 1
        Return a handle which can be used to unsubscribe
330
        :param var: Variable to which you want to subscribe
331 1
        :param deadband_val: Absolute float value
332 1
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
333
        :param queuesize: Wanted queue size, default is 1
334 1
        """
335
        deadband_filter = ua.DataChangeFilter()
336 1
        deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
337
        deadband_filter.DeadbandType = deadbandtype
338
        deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
339
        return self._subscribe(var, attr, deadband_filter, queuesize)
340