Completed
Pull Request — master (#579)
by
unknown
06:23
created

Subscription._call_datachange()   C

Complexity

Conditions 8

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 18.648

Importance

Changes 0
Metric Value
cc 8
c 0
b 0
f 0
dl 0
loc 21
ccs 9
cts 20
cp 0.45
crap 18.648
rs 5.8823
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7 1
import collections
8
9 1
from opcua import ua
10 1
from opcua.common import events
11 1
from opcua import Node
12
13
14 1
class SubHandler(object):
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20 1
    def data_change(self, handle, node, val, attr):
21
        """
22
        Deprecated, use datachange_notification
23
        """
24
        pass
25
26 1
    def datachange_notification(self, node, val, data):
27
        """
28
        called for every datachange notification from server
29
        """
30
        pass
31
32 1
    def event_notification(self, event):
33
        """
34
        called for every event notification from server
35
        """
36
        pass
37
38 1
    def status_change_notification(self, status):
39
        """
40
        called for every status change notification from server
41
        """
42
        pass
43
44
45 1
class SubscriptionItemData(object):
46
    """
47
    To store useful data from a monitored item
48
    """
49 1
    def __init__(self):
50 1
        self.node = None
51 1
        self.client_handle = None
52 1
        self.server_handle = None
53 1
        self.attribute = None
54 1
        self.mfilter = None
55
56
57 1
class DataChangeNotif(object):
58
    """
59
    To be send to clients for every datachange notification from server
60
    """
61 1
    def __init__(self, subscription_data, monitored_item):
62 1
        self.monitored_item = monitored_item
63 1
        self.subscription_data = subscription_data
64
65 1
    def __str__(self):
66
        return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
67 1
    __repr__ = __str__
68
69
70 1
class Subscription(object):
71
    """
72
    Subscription object returned by Server or Client objects.
73
    The object represent a subscription to an opc-ua server.
74
    This is a high level class, especially subscribe_data_change
75
    and subscribe_events methods. If more control is necessary look at
76
    code and/or use create_monitored_items method.
77
    """
78
79 1
    def __init__(self, server, params, handler):
80 1
        self.logger = logging.getLogger(__name__)
81 1
        self.server = server
82 1
        self._client_handle = 200
83 1
        self._handler = handler
84 1
        self.parameters = params  # move to data class
85 1
        self._monitoreditems_map = {}
86 1
        self._lock = Lock()
87 1
        self.subscription_id = None
88 1
        response = self.server.create_subscription(params, self.publish_callback)
89 1
        self.subscription_id = response.SubscriptionId  # move to data class
90
91
        # Launching two publish requests is a heuristic. We try to ensure
92
        # that the server always has at least one publish request in the queue,
93
        # even after it just replied to a publish request.
94 1
        self.server.publish()
95 1
        self.server.publish()
96
97 1
    def delete(self):
98
        """
99
        Delete subscription on server. This is automatically done by Client and Server classes on exit
100
        """
101 1
        results = self.server.delete_subscriptions([self.subscription_id])
102 1
        results[0].check()
103
104 1
    def publish_callback(self, publishresult):
105 1
        self.logger.info("Publish callback called with result: %s", publishresult)
106 1
        while self.subscription_id is None:
107 1
            time.sleep(0.01)
108
109 1
        for notif in publishresult.NotificationMessage.NotificationData:
110 1
            if isinstance(notif, ua.DataChangeNotification):
111 1
                self._call_datachange(notif)
112 1
            elif isinstance(notif, ua.EventNotificationList):
113 1
                self._call_event(notif)
114
            elif isinstance(notif, ua.StatusChangeNotification):
115
                self._call_status(notif)
116
            else:
117
                self.logger.warning("Notification type not supported yet for notification %s", notif)
118
119 1
        ack = ua.SubscriptionAcknowledgement()
120 1
        ack.SubscriptionId = self.subscription_id
121 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
122 1
        self.server.publish([ack])
123
124 1
    def _call_datachange(self, datachange):
125 1
        for item in datachange.MonitoredItems:
126 1
            with self._lock:
127 1
                if item.ClientHandle not in self._monitoreditems_map:
128
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
129
                    continue
130 1
                data = self._monitoreditems_map[item.ClientHandle]
131 1
            if hasattr(self._handler, "datachange_notification"):
132 1
                event_data = DataChangeNotif(data, item)
133 1
                try:
134 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
135
                except Exception:
136
                    self.logger.exception("Exception calling data change handler")
137
            elif hasattr(self._handler, "data_change"):  # deprecated API
138
                self.logger.warning("data_change method is deprecated, use datachange_notification")
139
                try:
140
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
141
                except Exception:
142
                    self.logger.exception("Exception calling deprecated data change handler")
143
            else:
144
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
145
146 1
    def _call_event(self, eventlist):
147 1
        for event in eventlist.Events:
148 1
            with self._lock:
149 1
                data = self._monitoreditems_map[event.ClientHandle]
150 1
            result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
151 1
            result.server_handle = data.server_handle
152 1
            if hasattr(self._handler, "event_notification"):
153 1
                try:
154 1
                    self._handler.event_notification(result)
155
                except Exception:
156
                    self.logger.exception("Exception calling event handler")
157
            elif hasattr(self._handler, "event"):  # depcrecated API
158
                try:
159
                    self._handler.event(data.server_handle, result)
160
                except Exception:
161
                    self.logger.exception("Exception calling deprecated event handler")
162
            else:
163
                self.logger.error("Event subscription created but handler has no event_notification method")
164
165 1
    def _call_status(self, status):
166
        try:
167
            self._handler.status_change_notification(status.Status)
168
        except Exception:
169
            self.logger.exception("Exception calling status change handler")
170
171 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
172
        """
173
        Subscribe for data change events for a node or list of nodes.
174
        default attribute is Value.
175
        Return a handle which can be used to unsubscribe
176
        If more control is necessary use create_monitored_items method
177
        """
178 1
        return self._subscribe(nodes, attr, queuesize=0)
179
180 1
    def subscribe_data_timestamp_change(self, nodes, attr=ua.AttributeIds.Value):
181
        """
182
        Subscribe for data and timestamp change events for a node or list of nodes.
183
        default attribute is Value.
184
        Return a handle which can be used to unsubscribe
185
        If more control is necessary use create_monitored_items method
186
        """
187 1
        timestamp_filter = ua.DataChangeFilter()
188 1
        timestamp_filter.Trigger = ua.DataChangeTrigger(2) # send notification when status, value or timestamp change
189 1
        return self._subscribe(nodes, attr, mfilter=timestamp_filter, queuesize=0)
190
191 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None, queuesize=0):
192
        """
193
        Subscribe to events from a node. Default node is Server node.
194
        In most servers the server node is the only one you can subscribe to.
195
        if evtypes is not provided, evtype defaults to BaseEventType
196
        if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
197
        Return a handle which can be used to unsubscribe
198
        """
199 1
        sourcenode = Node(self.server, sourcenode)
200
201 1
        if evfilter is None:
202 1
            if not type(evtypes) in (list, tuple):
203 1
                evtypes = [evtypes]
204
205 1
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
206
207 1
            evfilter = events.get_filter_from_event_type(evtypes)
208 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
209
210 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
211 1
        is_list = True
212 1
        if isinstance(nodes, collections.Iterable):
213 1
            nodes = list(nodes)
214
        else:
215 1
            nodes = [nodes]
216 1
            is_list = False
217 1
        mirs = []
218 1
        for node in nodes:
219 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
220 1
            mirs.append(mir)
221
222 1
        mids = self.create_monitored_items(mirs)
223 1
        if is_list:
224 1
            return mids
225 1
        if type(mids[0]) == ua.StatusCode:
226 1
            mids[0].check()
227 1
        return mids[0]
228
229 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
230 1
        rv = ua.ReadValueId()
231 1
        rv.NodeId = node.nodeid
232 1
        rv.AttributeId = attr
233
        # rv.IndexRange //We leave it null, then the entire array is returned
234 1
        mparams = ua.MonitoringParameters()
235 1
        with self._lock:
236 1
            self._client_handle += 1
237 1
            mparams.ClientHandle = self._client_handle
238 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
239 1
        mparams.QueueSize = queuesize
240 1
        mparams.DiscardOldest = True
241 1
        if mfilter:
242 1
            mparams.Filter = mfilter
243 1
        mir = ua.MonitoredItemCreateRequest()
244 1
        mir.ItemToMonitor = rv
245 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
246 1
        mir.RequestedParameters = mparams
247 1
        return mir
248
249 1
    def create_monitored_items(self, monitored_items):
250
        """
251
        low level method to have full control over subscription parameters
252
        Client handle must be unique since it will be used as key for internal registration of data
253
        """
254 1
        params = ua.CreateMonitoredItemsParameters()
255 1
        params.SubscriptionId = self.subscription_id
256 1
        params.ItemsToCreate = monitored_items
257 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
258
259
        # insert monitored item into map to avoid notification arrive before result return
260
        # server_handle is left as None in purpose as we don't get it yet.
261 1
        with self._lock:
262 1
            for mi in monitored_items:
263 1
                data = SubscriptionItemData()
264 1
                data.client_handle = mi.RequestedParameters.ClientHandle
265 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
266 1
                data.attribute = mi.ItemToMonitor.AttributeId
267
                #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
268 1
                data.mfilter = mi.RequestedParameters.Filter
269 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
270 1
        results = self.server.create_monitored_items(params)
271 1
        mids = []
272
        # process result, add server_handle, or remove it if failed
273 1
        with self._lock:
274 1
            for idx, result in enumerate(results):
275 1
                mi = params.ItemsToCreate[idx]
276 1
                if not result.StatusCode.is_good():
277 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
278 1
                    mids.append(result.StatusCode)
279 1
                    continue
280 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
281 1
                data.server_handle = result.MonitoredItemId
282 1
                mids.append(result.MonitoredItemId)
283 1
        return mids
284
285 1
    def unsubscribe(self, handle):
286
        """
287
        unsubscribe to datachange or events using the handle returned while subscribing
288
        if you delete subscription, you do not need to unsubscribe
289
        """
290 1
        params = ua.DeleteMonitoredItemsParameters()
291 1
        params.SubscriptionId = self.subscription_id
292 1
        params.MonitoredItemIds = [handle]
293 1
        results = self.server.delete_monitored_items(params)
294 1
        results[0].check()
295 1
        with self._lock:
296 1
            for k, v in self._monitoreditems_map.items():
297 1
                if v.server_handle == handle:
298 1
                    del(self._monitoreditems_map[k])
299 1
                    return
300
301 1
    def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
302
        """
303
        Modify a monitored item.
304
        :param handle: Handle returned when originally subscribing
305
        :param new_samp_time: New wanted sample time
306
        :param new_queuesize: New wanted queuesize, default is 0
307
        :param mod_filter_val: New deadband filter value
308
        :return: Return a Modify Monitored Item Result
309
        """
310
        for monitored_item_index in self._monitoreditems_map:
311
            if self._monitoreditems_map[monitored_item_index].server_handle == handle:
312
                item_to_change = self._monitoreditems_map[monitored_item_index]
313
                break
314
        if mod_filter_val is None:
315
            mod_filter = None
316
        elif mod_filter_val < 0:
317
            mod_filter = item_to_change.mfilter
318
        else:
319
            mod_filter = ua.DataChangeFilter()
320
            mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
321
            mod_filter.DeadbandType = 1
322
            mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
323
        modif_item = ua.MonitoredItemModifyRequest()
324
        modif_item.MonitoredItemId = handle
325
        modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
326
                                                                             mod_filter, item_to_change.client_handle)
327
        params = ua.ModifyMonitoredItemsParameters()
328
        params.SubscriptionId = self.subscription_id
329
        params.ItemsToModify.append(modif_item)
330
        results = self.server.modify_monitored_items(params)
331
        item_to_change.mfilter = results[0].FilterResult
332
        return results
333
334 1
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
335
        req_params = ua.MonitoringParameters()
336
        with self._lock:
337
            req_params.ClientHandle = client_handle
338
        req_params.QueueSize = new_queuesize
339
        req_params.Filter = mod_filter
340
        req_params.SamplingInterval = new_samp_time
341
        return req_params
342
343 1
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
344
        """
345
        Method to create a subscription with a Deadband Value.
346
        Default deadband value type is absolute.
347
        Return a handle which can be used to unsubscribe
348
        :param var: Variable to which you want to subscribe
349
        :param deadband_val: Absolute float value
350
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
351
        :param queuesize: Wanted queue size, default is 1
352
        """
353
        deadband_filter = ua.DataChangeFilter()
354
        deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
355
        deadband_filter.DeadbandType = deadbandtype
356
        deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
357
        return self._subscribe(var, attr, deadband_filter, queuesize)
358