Completed
Push — master ( eb5e78...c75f71 )
by Olivier
04:30
created

Subscription.__init__()   A

Complexity

Conditions 1

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1.0019

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
dl 0
loc 17
ccs 7
cts 8
cp 0.875
crap 1.0019
rs 9.4285
c 1
b 0
f 1
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua.common import events
10
from opcua import Node
11
12 1
13
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18 1
19
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24 1
25
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notification from server
28
        """
29
        pass
30 1
31
    def event_notification(self, event):
32
        """
33
        called for every event notification from server
34
        """
35
        pass
36 1
37
    def status_change_notification(self, status):
38
        """
39
        called for every status change notification from server
40
        """
41
        pass
42
43 1
44
class SubscriptionItemData(object):
45
    """
46
    To store useful data from a monitored item
47
    """
48 1
    def __init__(self):
49 1
        self.node = None
50
        self.client_handle = None
51 1
        self.server_handle = None
52
        self.attribute = None
53 1
        self.mfilter = None
54
55 1
56
class DataChangeNotif(object):
57
    """
58
    To be send to clients for every datachange notification from server
59 1
    """
60 1
    def __init__(self, subscription_data, monitored_item):
61 1
        self.monitored_item = monitored_item
62 1
        self.subscription_data = subscription_data
63 1
64
    def __str__(self):
65
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
66 1
    __repr__ = __str__
67
68
69
class Subscription(object):
70 1
    """
71 1
    Subscription object returned by Server or Client objects.
72 1
    The object represent a subscription to an opc-ua server.
73 1
    This is a high level class, especially subscribe_data_change
74 1
    and subscribe_events methods. If more control is necessary look at
75 1
    code and/or use create_monitored_items method.
76
    """
77
78 1
    def __init__(self, server, params, handler):
79
        self.logger = logging.getLogger(__name__)
80
        self.server = server
81
        self._client_handle = 200
82 1
        self._handler = handler
83 1
        self.parameters = params  # move to data class
84 1
        self._monitoreditems_map = {}
85
        self._lock = Lock()
86 1
        self.subscription_id = None
87
        response = self.server.create_subscription(params, self.publish_callback)
88 1
        self.subscription_id = response.SubscriptionId  # move to data class
89
90
        # Launching two publish requests is a heuristic. We try to ensure
91 1
        # that the server always has at least one publish request in the queue,
92
        # even after it just replied to a publish request.
93
        self.server.publish()
94
        self.server.publish()
95
96
    def delete(self):
97
        """
98
        Delete subscription on server. This is automatically done by Client and Server classes on exit
99
        """
100 1
        results = self.server.delete_subscriptions([self.subscription_id])
101 1
        results[0].check()
102 1
103 1
    def publish_callback(self, publishresult):
104 1
        self.logger.info("Publish callback called with result: %s", publishresult)
105 1
        while self.subscription_id is None:
106 1
            time.sleep(0.01)
107 1
108 1
        for notif in publishresult.NotificationMessage.NotificationData:
109 1
            if isinstance(notif, ua.DataChangeNotification):
110 1
                self._call_datachange(notif)
111 1
            elif isinstance(notif, ua.EventNotificationList):
112 1
                self._call_event(notif)
113
            elif isinstance(notif, ua.StatusChangeNotification):
114 1
                self._call_status(notif)
115
            else:
116
                self.logger.warning("Notification type not supported yet for notification %s", notif)
117
118 1
        ack = ua.SubscriptionAcknowledgement()
119 1
        ack.SubscriptionId = self.subscription_id
120
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
121 1
        self.server.publish([ack])
122 1
123 1
    def _call_datachange(self, datachange):
124 1
        for item in datachange.MonitoredItems:
125
            with self._lock:
126 1
                if item.ClientHandle not in self._monitoreditems_map:
127 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
128 1
                    continue
129 1
                data = self._monitoreditems_map[item.ClientHandle]
130 1
            if hasattr(self._handler, "datachange_notification"):
131
                event_data = DataChangeNotif(data, item)
132
                try:
133
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
134
                except Exception:
135
                    self.logger.exception("Exception calling data change handler")
136 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
137 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
138 1
                try:
139 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
140
                except Exception:
141 1
                    self.logger.exception("Exception calling deprecated data change handler")
142 1
            else:
143 1
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
144 1
145
    def _call_event(self, eventlist):
146
        for event in eventlist.Events:
147 1
            with self._lock:
148 1
                data = self._monitoreditems_map[event.ClientHandle]
149 1
            result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
150 1
            result.server_handle = data.server_handle
151 1
            if hasattr(self._handler, "event_notification"):
152
                try:
153
                    self._handler.event_notification(result)
154 1
                except Exception:
155 1
                    self.logger.exception("Exception calling event handler")
156 1
            elif hasattr(self._handler, "event"):  # depcrecated API
157 1
                try:
158
                    self._handler.event(data.server_handle, result)
159
                except Exception:
160
                    self.logger.exception("Exception calling deprecated event handler")
161
            else:
162
                self.logger.error("Event subscription created but handler has no event_notification method")
163 1
164 1
    def _call_status(self, status):
165 1
        try:
166 1
            self._handler.status_change_notification(status.Status)
167 1
        except Exception:
168 1
            self.logger.exception("Exception calling status change handler")
169 1
170 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
171
        """
172
        Subscribe for data change events for a node or list of nodes.
173 1
        default attribute is Value.
174 1
        Return a handle which can be used to unsubscribe
175 1
        If more control is necessary use create_monitored_items method
176 1
        """
177
        return self._subscribe(nodes, attr, queuesize=0)
178
179
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
180
        """
181
        Subscribe to events from a node. Default node is Server node.
182
        In most servers the server node is the only one you can subscribe to.
183
        if evtypes is not provided, evtype defaults to BaseEventType
184
        if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
185
        Return a handle which can be used to unsubscribe
186
        """
187 1
        sourcenode = Node(self.server, sourcenode)
188
189
        if evfilter is None:
190
            if not type(evtypes) in (list, tuple):
191
                evtypes = [evtypes]
192
193 1
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
194
195
            evfilter = events.get_filter_from_event_type(evtypes)
196
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
197
198
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
199
        is_list = True
200 1
        if not type(nodes) in (list, tuple):
201
            is_list = False
202 1
            nodes = [nodes]
203 1
        mirs = []
204
        for node in nodes:
205 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
206 1
            mirs.append(mir)
207
208 1
        mids = self.create_monitored_items(mirs)
209 1
        if is_list:
210
            return mids
211 1
        if type(mids[0]) == ua.StatusCode:
212 1
            mids[0].check()
213 1
        return mids[0]
214 1
215 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
216 1
        rv = ua.ReadValueId()
217 1
        rv.NodeId = node.nodeid
218 1
        rv.AttributeId = attr
219 1
        # rv.IndexRange //We leave it null, then the entire array is returned
220 1
        mparams = ua.MonitoringParameters()
221
        with self._lock:
222 1
            self._client_handle += 1
223
            mparams.ClientHandle = self._client_handle
224
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
225
        mparams.QueueSize = queuesize
226
        mparams.DiscardOldest = True
227
        if mfilter:
228 1
            mparams.Filter = mfilter
229 1
        mir = ua.MonitoredItemCreateRequest()
230 1
        mir.ItemToMonitor = rv
231
        mir.MonitoringMode = ua.MonitoringMode.Reporting
232 1
        mir.RequestedParameters = mparams
233 1
        return mir
234 1
235 1
    def create_monitored_items(self, monitored_items):
236 1
        """
237 1
        low level method to have full control over subscription parameters
238 1
        Client handle must be unique since it will be used as key for internal registration of data
239 1
        """
240 1
        params = ua.CreateMonitoredItemsParameters()
241
        params.SubscriptionId = self.subscription_id
242 1
        params.ItemsToCreate = monitored_items
243 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
244 1
245 1
        # insert monitored item into map to avoid notification arrive before result return
246 1
        # server_handle is left as None in purpose as we don't get it yet.
247 1
        with self._lock:
248
            for mi in monitored_items:
249 1
                data = SubscriptionItemData()
250 1
                data.client_handle = mi.RequestedParameters.ClientHandle
251 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
252 1
                data.attribute = mi.ItemToMonitor.AttributeId
253
                #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
254 1
                data.mfilter = mi.RequestedParameters.Filter
255 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
256 1
        results = self.server.create_monitored_items(params)
257 1
        mids = []
258 1
        # process result, add server_handle, or remove it if failed
259 1
        with self._lock:
260 1
            for idx, result in enumerate(results):
261 1
                mi = params.ItemsToCreate[idx]
262 1
                if not result.StatusCode.is_good():
263 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
264 1
                    mids.append(result.StatusCode)
265 1
                    continue
266 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
267 1
                data.server_handle = result.MonitoredItemId
268
                mids.append(result.MonitoredItemId)
269 1
        return mids
270
271
    def unsubscribe(self, handle):
272
        """
273
        unsubscribe to datachange or events using the handle returned while subscribing
274 1
        if you delete subscription, you do not need to unsubscribe
275 1
        """
276 1
        params = ua.DeleteMonitoredItemsParameters()
277 1
        params.SubscriptionId = self.subscription_id
278
        params.MonitoredItemIds = [handle]
279
        results = self.server.delete_monitored_items(params)
280
        results[0].check()
281 1
        with self._lock:
282 1
            for k, v in self._monitoreditems_map.items():
283 1
                if v.server_handle == handle:
284 1
                    del(self._monitoreditems_map[k])
285 1
                    return
286 1
287 1
    def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
288 1
        """
289 1
        Modify a monitored item.
290 1
        :param handle: Handle returned when originally subscribing
291
        :param new_samp_time: New wanted sample time
292 1
        :param new_queuesize: New wanted queuesize, default is 0
293 1
        :param mod_filter_val: New deadband filter value
294 1
        :return: Return a Modify Monitored Item Result
295 1
        """
296 1
        for monitored_item_index in self._monitoreditems_map:
297 1
            if self._monitoreditems_map[monitored_item_index].server_handle == handle:
298 1
                item_to_change = self._monitoreditems_map[monitored_item_index]
299 1
                break
300 1
        if mod_filter_val == None:
301 1
            mod_filter = None
302 1
        elif mod_filter_val < 0:
303
            mod_filter = item_to_change.mfilter
304 1
        else:
305
            mod_filter = ua.DataChangeFilter()
306
            mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
307
            mod_filter.DeadbandType = 1
308
            mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
309 1
        modif_item = ua.MonitoredItemModifyRequest()
310 1
        modif_item.MonitoredItemId = handle
311 1
        modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
312 1
                                                                             mod_filter)
313 1
        params = ua.ModifyMonitoredItemsParameters()
314 1
        params.SubscriptionId = self.subscription_id
315 1
        params.ItemsToModify.append(modif_item)
316 1
        results = self.server.modify_monitored_items(params)
317 1
        item_to_change.mfilter = results[0].FilterResult
318 1
        return results
319
320
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter):
321 1
        req_params = ua.MonitoringParameters()
322 1
        with self._lock:
323 1
            req_params.ClientHandle = self._client_handle
324
        req_params.QueueSize = new_queuesize
325 1
        req_params.Filter = mod_filter
326 1
        req_params.SamplingInterval = new_samp_time
327
        return req_params
328 1
329 1
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
330
        """
331 1
        Method to create a subscription with a Deadband Value.
332 1
        Default deadband value type is absolute.
333
        Return a handle which can be used to unsubscribe
334 1
        :param var: Variable to which you want to subscribe
335
        :param deadband_val: Absolute float value
336 1
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
337
        :param queuesize: Wanted queue size, default is 1
338
        """
339
        deadband_filter = ua.DataChangeFilter()
340
        deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
341
        deadband_filter.DeadbandType = deadbandtype
342
        deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
343
        return self._subscribe(var, attr, deadband_filter, queuesize)
344