Completed
Push — master ( d80205...a96f9a )
by Olivier
08:51
created

Subscription.delete()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 6
ccs 3
cts 3
cp 1
crap 1
rs 9.4285
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7 1
import collections
8
9 1
from opcua import ua
10 1
from opcua.common import events
11 1
from opcua import Node
12
13
14 1
class SubHandler(object):
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20 1
    def data_change(self, handle, node, val, attr):
21
        """
22
        Deprecated, use datachange_notification
23
        """
24
        pass
25
26 1
    def datachange_notification(self, node, val, data):
27
        """
28
        called for every datachange notification from server
29
        """
30
        pass
31
32 1
    def event_notification(self, event):
33
        """
34
        called for every event notification from server
35
        """
36
        pass
37
38 1
    def status_change_notification(self, status):
39
        """
40
        called for every status change notification from server
41
        """
42
        pass
43
44
45 1
class SubscriptionItemData(object):
46
    """
47
    To store useful data from a monitored item
48
    """
49 1
    def __init__(self):
50 1
        self.node = None
51 1
        self.client_handle = None
52 1
        self.server_handle = None
53 1
        self.attribute = None
54 1
        self.mfilter = None
55
56
57 1
class DataChangeNotif(object):
58
    """
59
    To be send to clients for every datachange notification from server
60
    """
61 1
    def __init__(self, subscription_data, monitored_item):
62 1
        self.monitored_item = monitored_item
63 1
        self.subscription_data = subscription_data
64
65 1
    def __str__(self):
66
        return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
67 1
    __repr__ = __str__
68
69
70 1
class Subscription(object):
71
    """
72
    Subscription object returned by Server or Client objects.
73
    The object represent a subscription to an opc-ua server.
74
    This is a high level class, especially subscribe_data_change
75
    and subscribe_events methods. If more control is necessary look at
76
    code and/or use create_monitored_items method.
77
    """
78
79 1
    def __init__(self, server, params, handler):
80 1
        self.logger = logging.getLogger(__name__)
81 1
        self.server = server
82 1
        self._client_handle = 200
83 1
        self._handler = handler
84 1
        self.parameters = params  # move to data class
85 1
        self._monitoreditems_map = {}
86 1
        self._lock = Lock()
87 1
        self.subscription_id = None
88 1
        response = self.server.create_subscription(params, self.publish_callback)
89 1
        self.subscription_id = response.SubscriptionId  # move to data class
90
91
        # Launching two publish requests is a heuristic. We try to ensure
92
        # that the server always has at least one publish request in the queue,
93
        # even after it just replied to a publish request.
94 1
        self.server.publish()
95 1
        self.server.publish()
96
97 1
    def delete(self):
98
        """
99
        Delete subscription on server. This is automatically done by Client and Server classes on exit
100
        """
101 1
        results = self.server.delete_subscriptions([self.subscription_id])
102 1
        results[0].check()
103
104 1
    def publish_callback(self, publishresult):
105 1
        self.logger.info("Publish callback called with result: %s", publishresult)
106 1
        while self.subscription_id is None:
107
            time.sleep(0.01)
108
109 1
        for notif in publishresult.NotificationMessage.NotificationData:
110 1
            if isinstance(notif, ua.DataChangeNotification):
111 1
                self._call_datachange(notif)
112 1
            elif isinstance(notif, ua.EventNotificationList):
113 1
                self._call_event(notif)
114
            elif isinstance(notif, ua.StatusChangeNotification):
115
                self._call_status(notif)
116
            else:
117
                self.logger.warning("Notification type not supported yet for notification %s", notif)
118
119 1
        ack = ua.SubscriptionAcknowledgement()
120 1
        ack.SubscriptionId = self.subscription_id
121 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
122 1
        self.server.publish([ack])
123
124 1
    def _call_datachange(self, datachange):
125 1
        for item in datachange.MonitoredItems:
126 1
            with self._lock:
127 1
                if item.ClientHandle not in self._monitoreditems_map:
128
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
129
                    continue
130 1
                data = self._monitoreditems_map[item.ClientHandle]
131 1
            if hasattr(self._handler, "datachange_notification"):
132 1
                event_data = DataChangeNotif(data, item)
133 1
                try:
134 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
135
                except Exception:
136
                    self.logger.exception("Exception calling data change handler")
137
            elif hasattr(self._handler, "data_change"):  # deprecated API
138
                self.logger.warning("data_change method is deprecated, use datachange_notification")
139
                try:
140
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
141
                except Exception:
142
                    self.logger.exception("Exception calling deprecated data change handler")
143
            else:
144
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
145
146 1
    def _call_event(self, eventlist):
147 1
        for event in eventlist.Events:
148 1
            with self._lock:
149 1
                data = self._monitoreditems_map[event.ClientHandle]
150 1
            result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
151 1
            result.server_handle = data.server_handle
152 1
            if hasattr(self._handler, "event_notification"):
153 1
                try:
154 1
                    self._handler.event_notification(result)
155
                except Exception:
156
                    self.logger.exception("Exception calling event handler")
157
            elif hasattr(self._handler, "event"):  # depcrecated API
158
                try:
159
                    self._handler.event(data.server_handle, result)
160
                except Exception:
161
                    self.logger.exception("Exception calling deprecated event handler")
162
            else:
163
                self.logger.error("Event subscription created but handler has no event_notification method")
164
165 1
    def _call_status(self, status):
166
        try:
167
            self._handler.status_change_notification(status.Status)
168
        except Exception:
169
            self.logger.exception("Exception calling status change handler")
170
171 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
172
        """
173
        Subscribe for data change events for a node or list of nodes.
174
        default attribute is Value.
175
        Return a handle which can be used to unsubscribe
176
        If more control is necessary use create_monitored_items method
177
        """
178 1
        return self._subscribe(nodes, attr, queuesize=0)
179
180 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None, queuesize=0):
181
        """
182
        Subscribe to events from a node. Default node is Server node.
183
        In most servers the server node is the only one you can subscribe to.
184
        if evtypes is not provided, evtype defaults to BaseEventType
185
        if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
186
        Return a handle which can be used to unsubscribe
187
        """
188 1
        sourcenode = Node(self.server, sourcenode)
189
190 1
        if evfilter is None:
191 1
            if not type(evtypes) in (list, tuple):
192 1
                evtypes = [evtypes]
193
194 1
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
195
196 1
            evfilter = events.get_filter_from_event_type(evtypes)
197 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
198
199 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
200 1
        is_list = True
201 1
        if isinstance(nodes, collections.Iterable):
202 1
            nodes = list(nodes)
203
        else:
204 1
            nodes = [nodes]
205 1
            is_list = False
206 1
        mirs = []
207 1
        for node in nodes:
208 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
209 1
            mirs.append(mir)
210
211 1
        mids = self.create_monitored_items(mirs)
212 1
        if is_list:
213 1
            return mids
214 1
        if type(mids[0]) == ua.StatusCode:
215 1
            mids[0].check()
216 1
        return mids[0]
217
218 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
219 1
        rv = ua.ReadValueId()
220 1
        rv.NodeId = node.nodeid
221 1
        rv.AttributeId = attr
222
        # rv.IndexRange //We leave it null, then the entire array is returned
223 1
        mparams = ua.MonitoringParameters()
224 1
        with self._lock:
225 1
            self._client_handle += 1
226 1
            mparams.ClientHandle = self._client_handle
227 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
228 1
        mparams.QueueSize = queuesize
229 1
        mparams.DiscardOldest = True
230 1
        if mfilter:
231 1
            mparams.Filter = mfilter
232 1
        mir = ua.MonitoredItemCreateRequest()
233 1
        mir.ItemToMonitor = rv
234 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
235 1
        mir.RequestedParameters = mparams
236 1
        return mir
237
238 1
    def create_monitored_items(self, monitored_items):
239
        """
240
        low level method to have full control over subscription parameters
241
        Client handle must be unique since it will be used as key for internal registration of data
242
        """
243 1
        params = ua.CreateMonitoredItemsParameters()
244 1
        params.SubscriptionId = self.subscription_id
245 1
        params.ItemsToCreate = monitored_items
246 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
247
248
        # insert monitored item into map to avoid notification arrive before result return
249
        # server_handle is left as None in purpose as we don't get it yet.
250 1
        with self._lock:
251 1
            for mi in monitored_items:
252 1
                data = SubscriptionItemData()
253 1
                data.client_handle = mi.RequestedParameters.ClientHandle
254 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
255 1
                data.attribute = mi.ItemToMonitor.AttributeId
256
                #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
257 1
                data.mfilter = mi.RequestedParameters.Filter
258 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
259 1
        results = self.server.create_monitored_items(params)
260 1
        mids = []
261
        # process result, add server_handle, or remove it if failed
262 1
        with self._lock:
263 1
            for idx, result in enumerate(results):
264 1
                mi = params.ItemsToCreate[idx]
265 1
                if not result.StatusCode.is_good():
266 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
267 1
                    mids.append(result.StatusCode)
268 1
                    continue
269 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
270 1
                data.server_handle = result.MonitoredItemId
271 1
                mids.append(result.MonitoredItemId)
272 1
        return mids
273
274 1
    def unsubscribe(self, handle):
275
        """
276
        unsubscribe to datachange or events using the handle returned while subscribing
277
        if you delete subscription, you do not need to unsubscribe
278
        """
279 1
        params = ua.DeleteMonitoredItemsParameters()
280 1
        params.SubscriptionId = self.subscription_id
281 1
        params.MonitoredItemIds = [handle]
282 1
        results = self.server.delete_monitored_items(params)
283 1
        results[0].check()
284 1
        with self._lock:
285 1
            for k, v in self._monitoreditems_map.items():
286 1
                if v.server_handle == handle:
287 1
                    del(self._monitoreditems_map[k])
288 1
                    return
289
290 1
    def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
291
        """
292
        Modify a monitored item.
293
        :param handle: Handle returned when originally subscribing
294
        :param new_samp_time: New wanted sample time
295
        :param new_queuesize: New wanted queuesize, default is 0
296
        :param mod_filter_val: New deadband filter value
297
        :return: Return a Modify Monitored Item Result
298
        """
299
        for monitored_item_index in self._monitoreditems_map:
300
            if self._monitoreditems_map[monitored_item_index].server_handle == handle:
301
                item_to_change = self._monitoreditems_map[monitored_item_index]
302
                break
303
        if mod_filter_val is None:
304
            mod_filter = None
305
        elif mod_filter_val < 0:
306
            mod_filter = item_to_change.mfilter
307
        else:
308
            mod_filter = ua.DataChangeFilter()
309
            mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
310
            mod_filter.DeadbandType = 1
311
            mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
312
        modif_item = ua.MonitoredItemModifyRequest()
313
        modif_item.MonitoredItemId = handle
314
        modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
315
                                                                             mod_filter, item_to_change.client_handle)
316
        params = ua.ModifyMonitoredItemsParameters()
317
        params.SubscriptionId = self.subscription_id
318
        params.ItemsToModify.append(modif_item)
319
        results = self.server.modify_monitored_items(params)
320
        item_to_change.mfilter = results[0].FilterResult
321
        return results
322
323 1
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
324
        req_params = ua.MonitoringParameters()
325
        with self._lock:
326
            req_params.ClientHandle = client_handle
327
        req_params.QueueSize = new_queuesize
328
        req_params.Filter = mod_filter
329
        req_params.SamplingInterval = new_samp_time
330
        return req_params
331
332 1
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
333
        """
334
        Method to create a subscription with a Deadband Value.
335
        Default deadband value type is absolute.
336
        Return a handle which can be used to unsubscribe
337
        :param var: Variable to which you want to subscribe
338
        :param deadband_val: Absolute float value
339
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
340
        :param queuesize: Wanted queue size, default is 1
341
        """
342
        deadband_filter = ua.DataChangeFilter()
343
        deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
344
        deadband_filter.DeadbandType = deadbandtype
345
        deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
346
        return self._subscribe(var, attr, deadband_filter, queuesize)
347