Completed
Push — master ( f3adbc...5b1468 )
by Olivier
02:19
created

Subscription._modify_monitored_item_request()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185

Importance

Changes 3
Bugs 0 Features 2
Metric Value
cc 2
c 3
b 0
f 2
dl 0
loc 8
ccs 5
cts 6
cp 0.8333
crap 2.0185
rs 9.4285
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua.common import events
10
from opcua import Node
11
12 1
13
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18 1
19
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24 1
25
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notification from server
28
        """
29
        pass
30 1
31
    def event_notification(self, event):
32
        """
33
        called for every event notification from server
34
        """
35
        pass
36 1
37
    def status_change_notification(self, status):
38
        """
39
        called for every status change notification from server
40
        """
41
        pass
42
43 1
44
class SubscriptionItemData(object):
45
    """
46
    To store useful data from a monitored item
47
    """
48 1
    def __init__(self):
49 1
        self.node = None
50
        self.client_handle = None
51 1
        self.server_handle = None
52
        self.attribute = None
53 1
        self.mfilter = None
54
55 1
56
class DataChangeNotif(object):
57
    """
58
    To be send to clients for every datachange notification from server
59 1
    """
60 1
    def __init__(self, subscription_data, monitored_item):
61 1
        self.monitored_item = monitored_item
62 1
        self.subscription_data = subscription_data
63 1
64
    def __str__(self):
65
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
66 1
    __repr__ = __str__
67
68
69
class Subscription(object):
70 1
    """
71 1
    Subscription object returned by Server or Client objects.
72 1
    The object represent a subscription to an opc-ua server.
73 1
    This is a high level class, especially subscribe_data_change
74 1
    and subscribe_events methods. If more control is necessary look at
75 1
    code and/or use create_monitored_items method.
76
    """
77
78 1
    def __init__(self, server, params, handler):
79
        self.logger = logging.getLogger(__name__)
80
        self.server = server
81
        self._client_handle = 200
82 1
        self._handler = handler
83 1
        self.parameters = params  # move to data class
84 1
        self._monitoreditems_map = {}
85
        self._lock = Lock()
86 1
        self.subscription_id = None
87
        response = self.server.create_subscription(params, self.publish_callback)
88 1
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.server.publish()
90
        self.server.publish()
91 1
92
    def delete(self):
93
        """
94
        Delete subscription on server. This is automatically done by Client and Server classes on exit
95
        """
96
        results = self.server.delete_subscriptions([self.subscription_id])
97
        results[0].check()
98
99
    def publish_callback(self, publishresult):
100 1
        self.logger.info("Publish callback called with result: %s", publishresult)
101 1
        while self.subscription_id is None:
102 1
            time.sleep(0.01)
103 1
104 1
        for notif in publishresult.NotificationMessage.NotificationData:
105 1
            if isinstance(notif, ua.DataChangeNotification):
106 1
                self._call_datachange(notif)
107 1
            elif isinstance(notif, ua.EventNotificationList):
108 1
                self._call_event(notif)
109 1
            elif isinstance(notif, ua.StatusChangeNotification):
110 1
                self._call_status(notif)
111 1
            else:
112 1
                self.logger.warning("Notification type not supported yet for notification %s", notif)
113
114 1
        ack = ua.SubscriptionAcknowledgement()
115
        ack.SubscriptionId = self.subscription_id
116
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
117
        self.server.publish([ack])
118 1
119 1
    def _call_datachange(self, datachange):
120
        for item in datachange.MonitoredItems:
121 1
            with self._lock:
122 1
                if item.ClientHandle not in self._monitoreditems_map:
123 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
124 1
                    continue
125
                data = self._monitoreditems_map[item.ClientHandle]
126 1
            if hasattr(self._handler, "datachange_notification"):
127 1
                event_data = DataChangeNotif(data, item)
128 1
                try:
129 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
130 1
                except Exception:
131
                    self.logger.exception("Exception calling data change handler")
132
            elif hasattr(self._handler, "data_change"):  # deprecated API
133
                self.logger.warning("data_change method is deprecated, use datachange_notification")
134
                try:
135
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
136 1
                except Exception:
137 1
                    self.logger.exception("Exception calling deprecated data change handler")
138 1
            else:
139 1
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
140
141 1
    def _call_event(self, eventlist):
142 1
        for event in eventlist.Events:
143 1
            with self._lock:
144 1
                data = self._monitoreditems_map[event.ClientHandle]
145
            result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
146
            result.server_handle = data.server_handle
147 1
            if hasattr(self._handler, "event_notification"):
148 1
                try:
149 1
                    self._handler.event_notification(result)
150 1
                except Exception:
151 1
                    self.logger.exception("Exception calling event handler")
152
            elif hasattr(self._handler, "event"):  # depcrecated API
153
                try:
154 1
                    self._handler.event(data.server_handle, result)
155 1
                except Exception:
156 1
                    self.logger.exception("Exception calling deprecated event handler")
157 1
            else:
158
                self.logger.error("Event subscription created but handler has no event_notification method")
159
160
    def _call_status(self, status):
161
        try:
162
            self._handler.status_change_notification(status.Status)
163 1
        except Exception:
164 1
            self.logger.exception("Exception calling status change handler")
165 1
166 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
167 1
        """
168 1
        Subscribe for data change events for a node or list of nodes.
169 1
        default attribute is Value.
170 1
        Return a handle which can be used to unsubscribe
171
        If more control is necessary use create_monitored_items method
172
        """
173 1
        return self._subscribe(nodes, attr, queuesize=0)
174 1
175 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
176 1
        """
177
        Subscribe to events from a node. Default node is Server node.
178
        In most servers the server node is the only one you can subscribe to.
179
        if evtypes is not provided, evtype defaults to BaseEventType
180
        if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
181
        Return a handle which can be used to unsubscribe
182
        """
183
        sourcenode = Node(self.server, sourcenode)
184
185
        if evfilter is None:
186
            # FIXME Review this, the commented out way doesn't support evtypes being passed a Node object
187 1
            # if not type(evtypes) in (list, tuple):
188
            #     evtypes = [evtypes]
189
            #
190
            # evtypes = [Node(self.server, i) for i in evtypes]  # make sure we have a list of Node objects
191
192
            if not type(evtypes) in (list, tuple):
193 1
                evtypes = [evtypes]
194
195
            # FIXME not a very nice way to make sure events.get_filter gets a list of nodes...
196
            evtype_nodes = []
197
            for evtype in evtypes:
198
                if not isinstance(evtype, Node):
199
                    evtype_nodes.append(Node(self.server, ua.NodeId(evtype)))  # make sure we have a list of Node objects
200 1
                else:
201
                    evtype_nodes.append(evtype)
202 1
203 1
            evfilter = events.get_filter_from_event_type(evtype_nodes)
204
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
205 1
206 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
207
        is_list = True
208 1
        if not type(nodes) in (list, tuple):
209 1
            is_list = False
210
            nodes = [nodes]
211 1
        mirs = []
212 1
        for node in nodes:
213 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
214 1
            mirs.append(mir)
215 1
216 1
        mids = self.create_monitored_items(mirs)
217 1
        if is_list:
218 1
            return mids
219 1
        if type(mids[0]) == ua.StatusCode:
220 1
            mids[0].check()
221
        return mids[0]
222 1
223
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
224
        rv = ua.ReadValueId()
225
        rv.NodeId = node.nodeid
226
        rv.AttributeId = attr
227
        # rv.IndexRange //We leave it null, then the entire array is returned
228 1
        mparams = ua.MonitoringParameters()
229 1
        with self._lock:
230 1
            self._client_handle += 1
231
            mparams.ClientHandle = self._client_handle
232 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
233 1
        mparams.QueueSize = queuesize
234 1
        mparams.DiscardOldest = True
235 1
        if mfilter:
236 1
            mparams.Filter = mfilter
237 1
        mir = ua.MonitoredItemCreateRequest()
238 1
        mir.ItemToMonitor = rv
239 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
240 1
        mir.RequestedParameters = mparams
241
        return mir
242 1
243 1
    def create_monitored_items(self, monitored_items):
244 1
        """
245 1
        low level method to have full control over subscription parameters
246 1
        Client handle must be unique since it will be used as key for internal registration of data
247 1
        """
248
        params = ua.CreateMonitoredItemsParameters()
249 1
        params.SubscriptionId = self.subscription_id
250 1
        params.ItemsToCreate = monitored_items
251 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
252 1
253
        # insert monitored item into map to avoid notification arrive before result return
254 1
        # server_handle is left as None in purpose as we don't get it yet.
255 1
        with self._lock:
256 1
            for mi in monitored_items:
257 1
                data = SubscriptionItemData()
258 1
                data.client_handle = mi.RequestedParameters.ClientHandle
259 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
260 1
                data.attribute = mi.ItemToMonitor.AttributeId
261 1
                #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
262 1
                data.mfilter = mi.RequestedParameters.Filter
263 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
264 1
        results = self.server.create_monitored_items(params)
265 1
        mids = []
266 1
        # process result, add server_handle, or remove it if failed
267 1
        with self._lock:
268
            for idx, result in enumerate(results):
269 1
                mi = params.ItemsToCreate[idx]
270
                if not result.StatusCode.is_good():
271
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
272
                    mids.append(result.StatusCode)
273
                    continue
274 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
275 1
                data.server_handle = result.MonitoredItemId
276 1
                mids.append(result.MonitoredItemId)
277 1
        return mids
278
279
    def unsubscribe(self, handle):
280
        """
281 1
        unsubscribe to datachange or events using the handle returned while subscribing
282 1
        if you delete subscription, you do not need to unsubscribe
283 1
        """
284 1
        params = ua.DeleteMonitoredItemsParameters()
285 1
        params.SubscriptionId = self.subscription_id
286 1
        params.MonitoredItemIds = [handle]
287 1
        results = self.server.delete_monitored_items(params)
288 1
        results[0].check()
289 1
        with self._lock:
290 1
            for k, v in self._monitoreditems_map.items():
291
                if v.server_handle == handle:
292 1
                    del(self._monitoreditems_map[k])
293 1
                    return
294 1
295 1
    def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
296 1
        """
297 1
        Modify a monitored item.
298 1
        :param handle: Handle returned when originally subscribing
299 1
        :param new_samp_time: New wanted sample time
300 1
        :param new_queuesize: New wanted queuesize, default is 0
301 1
        :param mod_filter_val: New deadband filter value
302 1
        :return: Return a Modify Monitored Item Result
303
        """
304 1
        for monitored_item_index in self._monitoreditems_map:
305
            if self._monitoreditems_map[monitored_item_index].server_handle == handle:
306
                item_to_change = self._monitoreditems_map[monitored_item_index]
307
                break
308
        if mod_filter_val == None:
309 1
            mod_filter = None
310 1
        elif mod_filter_val < 0:
311 1
            mod_filter = item_to_change.mfilter
312 1
        else:
313 1
            mod_filter = ua.DataChangeFilter()
314 1
            mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
315 1
            mod_filter.DeadbandType = 1
316 1
            mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
317 1
        modif_item = ua.MonitoredItemModifyRequest()
318 1
        modif_item.MonitoredItemId = handle
319
        modif_item.RequestedParameters = self._modify_monitored_item_request(new_queuesize, new_samp_time,
320
                                                                             mod_filter)
321 1
        params = ua.ModifyMonitoredItemsParameters()
322 1
        params.SubscriptionId = self.subscription_id
323 1
        params.ItemsToModify.append(modif_item)
324
        results = self.server.modify_monitored_items(params)
325 1
        item_to_change.mfilter = results[0].FilterResult
326 1
        return results
327
328 1
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter):
329 1
        req_params = ua.MonitoringParameters()
330
        with self._lock:
331 1
            req_params.ClientHandle = self._client_handle
332 1
        req_params.QueueSize = new_queuesize
333
        req_params.Filter = mod_filter
334 1
        req_params.SamplingInterval = new_samp_time
335
        return req_params
336 1
337
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
338
        """
339
        Method to create a subscription with a Deadband Value.
340
        Default deadband value type is absolute.
341
        Return a handle which can be used to unsubscribe
342
        :param var: Variable to which you want to subscribe
343
        :param deadband_val: Absolute float value
344
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
345
        :param queuesize: Wanted queue size, default is 1
346
        """
347
        deadband_filter = ua.DataChangeFilter()
348
        deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
349
        deadband_filter.DeadbandType = deadbandtype
350
        deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
351
        return self._subscribe(var, attr, deadband_filter, queuesize)
352