Passed
Pull Request — master (#45)
by
unknown
02:29
created

Subscription.unsubscribe()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 11
nop 2
dl 0
loc 16
rs 9.85
c 0
b 0
f 0
1
"""
2
high level interface to subscriptions
3
"""
4
import asyncio
5
import logging
6
import collections
7
from typing import Union, List, Iterable
8
9
from asyncua import ua
10
from .events import Event, get_filter_from_event_type
11
from .node import Node
12
13
14
class SubHandler:
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20
    def datachange_notification(self, node, val, data):
21
        """
22
        called for every datachange notification from server
23
        """
24
        pass
25
26
    def event_notification(self, event):
27
        """
28
        called for every event notification from server
29
        """
30
        pass
31
32
    def status_change_notification(self, status):
33
        """
34
        called for every status change notification from server
35
        """
36
        pass
37
38
39
class SubscriptionItemData:
40
    """
41
    To store useful data from a monitored item
42
    """
43
44
    def __init__(self):
45
        self.node = None
46
        self.client_handle = None
47
        self.server_handle = None
48
        self.attribute = None
49
        self.mfilter = None
50
51
52
class DataChangeNotif:
53
    """
54
    To be send to clients for every datachange notification from server
55
    """
56
57
    def __init__(self, subscription_data, monitored_item):
58
        self.monitored_item = monitored_item
59
        self.subscription_data = subscription_data
60
61
    def __str__(self):
62
        return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
63
64
    __repr__ = __str__
65
66
67
class Subscription:
68
    """
69
    Subscription object returned by Server or Client objects.
70
    The object represent a subscription to an opc-ua server.
71
    This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`.
72
    If more control is necessary look at code and/or use `create_monitored_items method`.
73
    :param server: `InternalSession` or `UAClient`
74
    """
75
76
    def __init__(self, server, params, handler, loop=None):
77
        self.loop = loop or asyncio.get_event_loop()
78
        self.logger = logging.getLogger(__name__)
79
        self.server = server
80
        self._client_handle = 200
81
        self._handler = handler
82
        self.parameters = params  # move to data class
83
        self._monitored_items = {}
84
        self.subscription_id = None
85
86
    async def init(self):
87
        response = await self.server.create_subscription(self.parameters, callback=self.publish_callback)
88
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.logger.info('Subscription created %s', self.subscription_id)
90
        #self.server.publish()
91
92
    def publish_callback(self, publish_result: ua.PublishResult):
93
        """
94
        Handle `PublishResult` callback.
95
        """
96
        self.logger.info("Publish callback called with result: %s", publish_result)
97
        if publish_result.NotificationMessage.NotificationData is not None:
98
            for notif in publish_result.NotificationMessage.NotificationData:
99
                if isinstance(notif, ua.DataChangeNotification):
100
                    self._call_datachange(notif)
101
                elif isinstance(notif, ua.EventNotificationList):
102
                    self._call_event(notif)
103
                elif isinstance(notif, ua.StatusChangeNotification):
104
                    self._call_status(notif)
105
                else:
106
                    self.logger.warning("Notification type not supported yet for notification %s", notif)
107
        else:
108
            self.logger.warning("NotificationMessage is None.")
109
110
    async def delete(self):
111
        """
112
        Delete subscription on server. This is automatically done by Client and Server classes on exit.
113
        """
114
        results = await self.server.delete_subscriptions([self.subscription_id])
115
        results[0].check()
116
117
    def _call_datachange(self, datachange: ua.DataChangeNotification):
118
        for item in datachange.MonitoredItems:
119
            if item.ClientHandle not in self._monitored_items:
120
                self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
121
                continue
122
            data = self._monitored_items[item.ClientHandle]
123
            if hasattr(self._handler, "datachange_notification"):
124
                event_data = DataChangeNotif(data, item)
125
                try:
126
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
127
                except Exception:
128
                    self.logger.exception("Exception calling data change handler")
129
            else:
130
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
131
132
    def _call_event(self, eventlist: ua.EventNotificationList):
133
        for event in eventlist.Events:
134
            data = self._monitored_items[event.ClientHandle]
135
            result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
136
            result.server_handle = data.server_handle
137
            if hasattr(self._handler, "event_notification"):
138
                try:
139
                    self._handler.event_notification(result)
140
                except Exception:
141
                    self.logger.exception("Exception calling event handler")
142
            else:
143
                self.logger.error("Event subscription created but handler has no event_notification method")
144
145
    def _call_status(self, status: ua.StatusChangeNotification):
146
        try:
147
            self._handler.status_change_notification(status.Status)
148
        except Exception:
149
            self.logger.exception("Exception calling status change handler")
150
151
    async def subscribe_data_change(self,
152
                                    nodes: Union[Node, Iterable[Node]],
153
                                    attr=ua.AttributeIds.Value,
154
                                    queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
155
        """
156
        Subscribe for data change events for one ore multiple nodes.
157
        The default attribute used for the subscription is `Value`.
158
        Return value is a handle which can be used to modify/cancel the subscription.
159
        The handle is either an integer value for single Nodes. If the subscription failes an `UaStatusCodeError`
160
        will be raises.
161
        If multiple Nodes were supplied a List of integers or ua.StatusCode objects is returned. StatusCode objects
162
        are returned to indicate that the subscription has failed (no exception will be raises in this case).
163
        If more control is necessary the `create_monitored_items` method can be used directly.
164
165
        :param nodes: One Node or an Iterable of Nodes
166
        :param attr: The Node attribute you want to subscribe to
167
        :param queuesize: 0 or 1 for default queue size (shall be 1 - noe queuing), n for FIFO queue
168
        :return: Handle for changing/cancelling of the subscription
169
        """
170
        return await self._subscribe(nodes, attr, queuesize=queuesize)
171
172
    async def subscribe_events(self,
173
                               sourcenode: Node = ua.ObjectIds.Server,
174
                               evtypes=ua.ObjectIds.BaseEventType,
175
                               evfilter=None,
176
                               queuesize=0) -> int:
177
        """
178
        Subscribe to events from a node. Default node is Server node.
179
        In most servers the server node is the only one you can subscribe to.
180
        If evtypes is not provided, evtype defaults to BaseEventType.
181
        If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
182
        A handle (integer value) is returned which can be used to modify/cancel the subscription.
183
184
        :param sourcenode:
185
        :param evtypes:
186
        :param evfilter:
187
        :param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
188
        MaxUInt32 for max queue size
189
        :return: Handle for changing/cancelling of the subscription
190
        """
191
        sourcenode = Node(self.server, sourcenode)
192
        if evfilter is None:
193
            if not type(evtypes) in (list, tuple):
194
                evtypes = [evtypes]
195
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
196
            evfilter = await get_filter_from_event_type(evtypes)
197
        return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
198
199
    async def _subscribe(self, nodes: Union[Node, Iterable[Node]],
200
                         attr, mfilter=None, queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
201
        """
202
        Private low level method for subscribing.
203
        :param nodes: One Node or an Iterable og Nodes.
204
        :param attr: ua.AttributeId
205
        :param mfilter: MonitoringFilter
206
        :param queuesize: queue size
207
        :return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
208
        """
209
        is_list = True
210
        if isinstance(nodes, collections.Iterable):
211
            nodes = list(nodes)
212
        else:
213
            nodes = [nodes]
214
            is_list = False
215
        # Create List of MonitoredItemCreateRequest
216
        mirs = []
217
        for node in nodes:
218
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
219
            mirs.append(mir)
220
        # Await MonitoredItemCreateResult
221
        mids = await self.create_monitored_items(mirs)
222
        if is_list:
223
            # Return results for multiple nodes
224
            return mids
225
        # Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
226
        if type(mids[0]) == ua.StatusCode:
227
            mids[0].check()
228
        return mids[0]
229
230
    def _make_monitored_item_request(self, node: Node, attr, mfilter, queuesize) -> ua.MonitoredItemCreateRequest:
231
        rv = ua.ReadValueId()
232
        rv.NodeId = node.nodeid
233
        rv.AttributeId = attr
234
        # rv.IndexRange //We leave it null, then the entire array is returned
235
        mparams = ua.MonitoringParameters()
236
        self._client_handle += 1
237
        mparams.ClientHandle = self._client_handle
238
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
239
        mparams.QueueSize = queuesize
240
        mparams.DiscardOldest = True
241
        if mfilter:
242
            mparams.Filter = mfilter
243
        mir = ua.MonitoredItemCreateRequest()
244
        mir.ItemToMonitor = rv
245
        mir.MonitoringMode = ua.MonitoringMode.Reporting
246
        mir.RequestedParameters = mparams
247
        return mir
248
249
    async def create_monitored_items(self, monitored_items) -> List[Union[int, ua.StatusCode]]:
250
        """
251
        low level method to have full control over subscription parameters.
252
        Client handle must be unique since it will be used as key for internal registration of data.
253
        """
254
        params = ua.CreateMonitoredItemsParameters()
255
        params.SubscriptionId = self.subscription_id
256
        params.ItemsToCreate = monitored_items
257
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
258
        # insert monitored item into map to avoid notification arrive before result return
259
        # server_handle is left as None in purpose as we don't get it yet.
260
        for mi in monitored_items:
261
            data = SubscriptionItemData()
262
            data.client_handle = mi.RequestedParameters.ClientHandle
263
            data.node = Node(self.server, mi.ItemToMonitor.NodeId)
264
            data.attribute = mi.ItemToMonitor.AttributeId
265
            # TODO: Either use the filter from request or from response.
266
            #  Here it uses from request, in modify it uses from response
267
            data.mfilter = mi.RequestedParameters.Filter
268
            self._monitored_items[mi.RequestedParameters.ClientHandle] = data
269
        results = await self.server.create_monitored_items(params)
270
        mids = []
271
        # process result, add server_handle, or remove it if failed
272
        for idx, result in enumerate(results):
273
            mi = params.ItemsToCreate[idx]
274
            if not result.StatusCode.is_good():
275
                del self._monitored_items[mi.RequestedParameters.ClientHandle]
276
                mids.append(result.StatusCode)
277
                continue
278
            data = self._monitored_items[mi.RequestedParameters.ClientHandle]
279
            data.server_handle = result.MonitoredItemId
280
            mids.append(result.MonitoredItemId)
281
        return mids
282
283
    async def unsubscribe(self, handle: Union[int, List[int]]):
284
        """
285
        Unsubscribe from datachange or events using the handle returned while subscribing.
286
        If you delete the subscription, you do not need to unsubscribe.
287
        :param handle: The handle that was returned when subscribing to the node/nodes
288
        """
289
        handles = [handle] if type(handle) is int else handle
290
        params = ua.DeleteMonitoredItemsParameters()
291
        params.SubscriptionId = self.subscription_id
292
        params.MonitoredItemIds = handles
293
        results = await self.server.delete_monitored_items(params)
294
        results[0].check()
295
        for k, v in self._monitored_items.items():
296
            if v.server_handle in handles:
297
                del (self._monitored_items[k])
298
                return
299
300
    async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=0, mod_filter_val=-1):
301
        """
302
        Modify a monitored item.
303
        :param handle: Handle returned when originally subscribing
304
        :param new_samp_time: New wanted sample time
305
        :param new_queuesize: New wanted queuesize, default is 0
306
        :param mod_filter_val: New deadband filter value
307
        :return: Return a Modify Monitored Item Result
308
        """
309
        # Find the monitored item in the monitored item registry.
310
        item_to_change = next(item for item in self._monitored_items.values() if item.server_handle == handle)
311
        if not item_to_change:
312
            raise ValueError('The monitored item was not found.')
313
        if mod_filter_val is None:
314
            mod_filter = None
315
        elif mod_filter_val < 0:
316
            mod_filter = item_to_change.mfilter
317
        else:
318
            mod_filter = ua.DataChangeFilter()
319
            # send notification when status or value change
320
            mod_filter.Trigger = ua.DataChangeTrigger(1)
321
            mod_filter.DeadbandType = 1
322
            # absolute float value or from 0 to 100 for percentage deadband
323
            mod_filter.DeadbandValue = mod_filter_val
324
        modif_item = ua.MonitoredItemModifyRequest()
325
        modif_item.MonitoredItemId = handle
326
        modif_item.RequestedParameters = self._modify_monitored_item_request(
327
            new_queuesize, new_samp_time, mod_filter, item_to_change.client_handle
328
        )
329
        params = ua.ModifyMonitoredItemsParameters()
330
        params.SubscriptionId = self.subscription_id
331
        params.ItemsToModify.append(modif_item)
332
        results = await self.server.modify_monitored_items(params)
333
        item_to_change.mfilter = results[0].FilterResult
334
        return results
335
336
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
337
        req_params = ua.MonitoringParameters()
338
        req_params.ClientHandle = client_handle
339
        req_params.QueueSize = new_queuesize
340
        req_params.Filter = mod_filter
341
        req_params.SamplingInterval = new_samp_time
342
        return req_params
343
344
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
345
        """
346
        Method to create a subscription with a Deadband Value.
347
        Default deadband value type is absolute.
348
        Return a handle which can be used to unsubscribe
349
        :param var: Variable to which you want to subscribe
350
        :param deadband_val: Absolute float value
351
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
352
        :param queuesize: Wanted queue size, default is 1
353
        :param attr: Attribute ID
354
        """
355
        deadband_filter = ua.DataChangeFilter()
356
        # send notification when status or value change
357
        deadband_filter.Trigger = ua.DataChangeTrigger(1)
358
        deadband_filter.DeadbandType = deadbandtype
359
        # absolute float value or from 0 to 100 for percentage deadband
360
        deadband_filter.DeadbandValue = deadband_val
361
        return self._subscribe(var, attr, deadband_filter, queuesize)
362