Passed
Pull Request — master (#183)
by
unknown
02:12
created

Subscription.unsubscribe()   A

Complexity

Conditions 5

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 13
nop 2
dl 0
loc 18
rs 9.2833
c 0
b 0
f 0
1
"""
2
high level interface to subscriptions
3
"""
4
import asyncio
5
import logging
6
import collections.abc
7
from typing import Union, List, Iterable
8
9
from asyncua import ua
10
from .events import Event, get_filter_from_event_type
11
from .node import Node
12
13
14
class SubHandler:
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20
    def datachange_notification(self, node, val, data):
21
        """
22
        called for every datachange notification from server
23
        """
24
        pass
25
26
    def event_notification(self, event):
27
        """
28
        called for every event notification from server
29
        """
30
        pass
31
32
    def status_change_notification(self, status):
33
        """
34
        called for every status change notification from server
35
        """
36
        pass
37
38
39
class SubscriptionItemData:
40
    """
41
    To store useful data from a monitored item.
42
    """
43
44
    def __init__(self):
45
        self.node = None
46
        self.client_handle = None
47
        self.server_handle = None
48
        self.attribute = None
49
        self.mfilter = None
50
51
52
class DataChangeNotif:
53
    """
54
    To be send to clients for every datachange notification from server.
55
    """
56
57
    def __init__(self, subscription_data, monitored_item):
58
        self.monitored_item = monitored_item
59
        self.subscription_data = subscription_data
60
61
    def __str__(self):
62
        return f"DataChangeNotification({self.subscription_data}, {self.monitored_item})"
63
64
    __repr__ = __str__
65
66
67
class Subscription:
68
    """
69
    Subscription object returned by Server or Client objects.
70
    The object represent a subscription to an opc-ua server.
71
    This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`.
72
    If more control is necessary look at code and/or use `create_monitored_items method`.
73
    :param server: `InternalSession` or `UAClient`
74
    """
75
76
    def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler, loop=None):
77
        self.loop = loop or asyncio.get_event_loop()
78
        self.logger = logging.getLogger(__name__)
79
        self.server = server
80
        self._client_handle = 200
81
        self._handler: SubHandler = handler
82
        self.parameters: ua.CreateSubscriptionParameters = params  # move to data class
83
        self._monitored_items = {}
84
        self.subscription_id = None
85
86
    async def init(self):
87
        response = await self.server.create_subscription(self.parameters, callback=self.publish_callback)
88
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.logger.info('Subscription created %s', self.subscription_id)
90
91
    async def publish_callback(self, publish_result: ua.PublishResult):
92
        """
93
        Handle `PublishResult` callback.
94
        """
95
        self.logger.info("Publish callback called with result: %s", publish_result)
96
        if publish_result.NotificationMessage.NotificationData is not None:
97
            for notif in publish_result.NotificationMessage.NotificationData:
98
                if isinstance(notif, ua.DataChangeNotification):
99
                    await self._call_datachange(notif)
100
                elif isinstance(notif, ua.EventNotificationList):
101
                    await self._call_event(notif)
102
                elif isinstance(notif, ua.StatusChangeNotification):
103
                    await self._call_status(notif)
104
                else:
105
                    self.logger.warning("Notification type not supported yet for notification %s", notif)
106
        else:
107
            self.logger.warning("NotificationMessage is None.")
108
109
    async def delete(self):
110
        """
111
        Delete subscription on server. This is automatically done by Client and Server classes on exit.
112
        """
113
        results = await self.server.delete_subscriptions([self.subscription_id])
114
        results[0].check()
115
116
    async def _call_datachange(self, datachange: ua.DataChangeNotification):
117
        for item in datachange.MonitoredItems:
118
            if item.ClientHandle not in self._monitored_items:
119
                self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
120
                continue
121
            data = self._monitored_items[item.ClientHandle]
122
            if hasattr(self._handler, "datachange_notification"):
123
                event_data = DataChangeNotif(data, item)
124
                try:
125
                    if asyncio.iscoroutinefunction(self._handler.datachange_notification):
126
                        await self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
127
                    else:
128
                        self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
129
                except Exception:
130
                    self.logger.exception("Exception calling data change handler")
131
            else:
132
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
133
134
    async def _call_event(self, eventlist: ua.EventNotificationList):
135
        for event in eventlist.Events:
136
            data = self._monitored_items[event.ClientHandle]
137
            result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
138
            result.server_handle = data.server_handle
139
            if hasattr(self._handler, "event_notification"):
140
                try:
141
                    if asyncio.iscoroutinefunction(self._handler.event_notification):
142
                        await self._handler.event_notification(result)
143
                    else:
144
                        self._handler.event_notification(result)
145
                except Exception:
146
                    self.logger.exception("Exception calling event handler")
147
            else:
148
                self.logger.error("Event subscription created but handler has no event_notification method")
149
150
    async def _call_status(self, status: ua.StatusChangeNotification):
151
        try:
152
            if asyncio.iscoroutinefunction(self._handler.status_change_notification):
153
                await self._handler.status_change_notification(status.Status)
154
            else:
155
                self._handler.status_change_notification(status.Status)
156
        except Exception:
157
            self.logger.exception("Exception calling status change handler")
158
159
    async def subscribe_data_change(self,
160
                                    nodes: Union[Node, Iterable[Node]],
161
                                    attr=ua.AttributeIds.Value,
162
                                    queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
163
        """
164
        Subscribe to data change events of one or multiple nodes.
165
        The default attribute used for the subscription is `Value`.
166
        Return value is a handle which can be used to modify/cancel the subscription.
167
        The handle is an integer value for single Nodes. If the creation of the subscription fails an
168
        `UaStatusCodeError` is raised.
169
        If multiple Nodes are supplied, a List of integers or ua.StatusCode objects is returned. A list of
170
        StatusCode objects are returned to indicate that the subscription has failed (no exception will be
171
        raised in this case).
172
        If more control is necessary the `create_monitored_items` method can be used directly.
173
174
        :param nodes: One Node or an Iterable of Nodes
175
        :param attr: The Node attribute you want to subscribe to
176
        :param queuesize: 0 or 1 for default queue size (shall be 1 - no queuing), n for FIFO queue
177
        :return: Handle for changing/cancelling of the subscription
178
        """
179
        return await self._subscribe(nodes, attr, queuesize=queuesize)
180
181
    async def subscribe_events(self,
182
                               sourcenode: Node = ua.ObjectIds.Server,
183
                               evtypes=ua.ObjectIds.BaseEventType,
184
                               evfilter=None,
185
                               queuesize=0) -> int:
186
        """
187
        Subscribe to events from a node. Default node is Server node.
188
        In most servers the server node is the only one you can subscribe to.
189
        If evtypes is not provided, evtype defaults to BaseEventType.
190
        If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
191
        A handle (integer value) is returned which can be used to modify/cancel the subscription.
192
193
        :param sourcenode:
194
        :param evtypes:
195
        :param evfilter:
196
        :param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
197
        MaxUInt32 for max queue size
198
        :return: Handle for changing/cancelling of the subscription
199
        """
200
        sourcenode = Node(self.server, sourcenode)
201
        if evfilter is None:
202
            if not type(evtypes) in (list, tuple):
203
                evtypes = [evtypes]
204
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
205
            evfilter = await get_filter_from_event_type(evtypes)
206
        return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
207
208
    async def _subscribe(self, nodes: Union[Node, Iterable[Node]],
209
                         attr, mfilter=None, queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
210
        """
211
        Private low level method for subscribing.
212
        :param nodes: One Node or an Iterable og Nodes.
213
        :param attr: ua.AttributeId
214
        :param mfilter: MonitoringFilter
215
        :param queuesize: queue size
216
        :return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
217
        """
218
        is_list = True
219
        if isinstance(nodes, collections.abc.Iterable):
220
            nodes = list(nodes)
221
        else:
222
            nodes = [nodes]
223
            is_list = False
224
        # Create List of MonitoredItemCreateRequest
225
        mirs = []
226
        for node in nodes:
227
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
228
            mirs.append(mir)
229
        # Await MonitoredItemCreateResult
230
        mids = await self.create_monitored_items(mirs)
231
        if is_list:
232
            # Return results for multiple nodes
233
            return mids
234
        # Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
235
        if type(mids[0]) == ua.StatusCode:
236
            mids[0].check()
237
        return mids[0]
238
239
    def _make_monitored_item_request(self, node: Node, attr, mfilter, queuesize) -> ua.MonitoredItemCreateRequest:
240
        rv = ua.ReadValueId()
241
        rv.NodeId = node.nodeid
242
        rv.AttributeId = attr
243
        # rv.IndexRange //We leave it null, then the entire array is returned
244
        mparams = ua.MonitoringParameters()
245
        self._client_handle += 1
246
        mparams.ClientHandle = self._client_handle
247
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
248
        mparams.QueueSize = queuesize
249
        mparams.DiscardOldest = True
250
        if mfilter:
251
            mparams.Filter = mfilter
252
        mir = ua.MonitoredItemCreateRequest()
253
        mir.ItemToMonitor = rv
254
        mir.MonitoringMode = ua.MonitoringMode.Reporting
255
        mir.RequestedParameters = mparams
256
        return mir
257
258
    async def create_monitored_items(self, monitored_items) -> List[Union[int, ua.StatusCode]]:
259
        """
260
        low level method to have full control over subscription parameters.
261
        Client handle must be unique since it will be used as key for internal registration of data.
262
        """
263
        params = ua.CreateMonitoredItemsParameters()
264
        params.SubscriptionId = self.subscription_id
265
        params.ItemsToCreate = monitored_items
266
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
267
        # insert monitored item into map to avoid notification arrive before result return
268
        # server_handle is left as None in purpose as we don't get it yet.
269
        for mi in monitored_items:
270
            data = SubscriptionItemData()
271
            data.client_handle = mi.RequestedParameters.ClientHandle
272
            data.node = Node(self.server, mi.ItemToMonitor.NodeId)
273
            data.attribute = mi.ItemToMonitor.AttributeId
274
            # TODO: Either use the filter from request or from response.
275
            #  Here it uses from request, in modify it uses from response
276
            data.mfilter = mi.RequestedParameters.Filter
277
            self._monitored_items[mi.RequestedParameters.ClientHandle] = data
278
        results = await self.server.create_monitored_items(params)
279
        mids = []
280
        # process result, add server_handle, or remove it if failed
281
        for idx, result in enumerate(results):
282
            mi = params.ItemsToCreate[idx]
283
            if not result.StatusCode.is_good():
284
                del self._monitored_items[mi.RequestedParameters.ClientHandle]
285
                mids.append(result.StatusCode)
286
                continue
287
            data = self._monitored_items[mi.RequestedParameters.ClientHandle]
288
            data.server_handle = result.MonitoredItemId
289
            mids.append(result.MonitoredItemId)
290
        return mids
291
292
    async def unsubscribe(self, handle: Union[int, List[int]]):
293
        """
294
        Unsubscribe from datachange or events using the handle returned while subscribing.
295
        If you delete the subscription, you do not need to unsubscribe.
296
        :param handle: The handle that was returned when subscribing to the node/nodes
297
        """
298
        handles = [handle] if type(handle) is int else handle
299
        if not handles:
300
            return
301
        params = ua.DeleteMonitoredItemsParameters()
302
        params.SubscriptionId = self.subscription_id
303
        params.MonitoredItemIds = handles
304
        results = await self.server.delete_monitored_items(params)
305
        results[0].check()
306
        handle_map = {v.server_handle: k for k, v in self._monitored_items.items()}
307
        for handle in handles:
308
            if handle in handle_map:
309
                del self._monitored_items[handle_map[handle]]
310
311
    async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=0, mod_filter_val=-1):
312
        """
313
        Modify a monitored item.
314
        :param handle: Handle returned when originally subscribing
315
        :param new_samp_time: New wanted sample time
316
        :param new_queuesize: New wanted queuesize, default is 0
317
        :param mod_filter_val: New deadband filter value
318
        :return: Return a Modify Monitored Item Result
319
        """
320
        # Find the monitored item in the monitored item registry.
321
        item_to_change = next(item for item in self._monitored_items.values() if item.server_handle == handle)
322
        if not item_to_change:
323
            raise ValueError('The monitored item was not found.')
324
        if mod_filter_val is None:
325
            mod_filter = None
326
        elif mod_filter_val < 0:
327
            mod_filter = item_to_change.mfilter
328
        else:
329
            mod_filter = ua.DataChangeFilter()
330
            # send notification when status or value change
331
            mod_filter.Trigger = ua.DataChangeTrigger(1)
332
            mod_filter.DeadbandType = 1
333
            # absolute float value or from 0 to 100 for percentage deadband
334
            mod_filter.DeadbandValue = mod_filter_val
335
        modif_item = ua.MonitoredItemModifyRequest()
336
        modif_item.MonitoredItemId = handle
337
        modif_item.RequestedParameters = self._modify_monitored_item_request(
338
            new_queuesize, new_samp_time, mod_filter, item_to_change.client_handle
339
        )
340
        params = ua.ModifyMonitoredItemsParameters()
341
        params.SubscriptionId = self.subscription_id
342
        params.ItemsToModify.append(modif_item)
343
        results = await self.server.modify_monitored_items(params)
344
        item_to_change.mfilter = results[0].FilterResult
345
        return results
346
347
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
348
        req_params = ua.MonitoringParameters()
349
        req_params.ClientHandle = client_handle
350
        req_params.QueueSize = new_queuesize
351
        req_params.Filter = mod_filter
352
        req_params.SamplingInterval = new_samp_time
353
        return req_params
354
355
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
356
        """
357
        Method to create a subscription with a Deadband Value.
358
        Default deadband value type is absolute.
359
        Return a handle which can be used to unsubscribe
360
        :param var: Variable to which you want to subscribe
361
        :param deadband_val: Absolute float value
362
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
363
        :param queuesize: Wanted queue size, default is 1
364
        :param attr: Attribute ID
365
        """
366
        deadband_filter = ua.DataChangeFilter()
367
        # send notification when status or value change
368
        deadband_filter.Trigger = ua.DataChangeTrigger(1)
369
        deadband_filter.DeadbandType = deadbandtype
370
        # absolute float value or from 0 to 100 for percentage deadband
371
        deadband_filter.DeadbandValue = deadband_val
372
        return self._subscribe(var, attr, deadband_filter, queuesize)
373