Passed
Pull Request — master (#45)
by
unknown
02:35
created

Subscription.unsubscribe()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 11
nop 2
dl 0
loc 16
rs 9.85
c 0
b 0
f 0
1
"""
2
high level interface to subscriptions
3
"""
4
import asyncio
5
import logging
6
import collections
7
from typing import Union, List, Iterable
8
9
from asyncua import ua
10
from .events import Event, get_filter_from_event_type
11
from .node import Node
12
13
14
class SubHandler:
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20
    def datachange_notification(self, node, val, data):
21
        """
22
        called for every datachange notification from server
23
        """
24
        pass
25
26
    def event_notification(self, event):
27
        """
28
        called for every event notification from server
29
        """
30
        pass
31
32
    def status_change_notification(self, status):
33
        """
34
        called for every status change notification from server
35
        """
36
        pass
37
38
39
class SubscriptionItemData:
40
    """
41
    To store useful data from a monitored item.
42
    """
43
44
    def __init__(self):
45
        self.node = None
46
        self.client_handle = None
47
        self.server_handle = None
48
        self.attribute = None
49
        self.mfilter = None
50
51
52
class DataChangeNotif:
53
    """
54
    To be send to clients for every datachange notification from server.
55
    """
56
57
    def __init__(self, subscription_data, monitored_item):
58
        self.monitored_item = monitored_item
59
        self.subscription_data = subscription_data
60
61
    def __str__(self):
62
        return f"DataChangeNotification({self.subscription_data}, {self.monitored_item})"
63
64
    __repr__ = __str__
65
66
67
class Subscription:
68
    """
69
    Subscription object returned by Server or Client objects.
70
    The object represent a subscription to an opc-ua server.
71
    This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`.
72
    If more control is necessary look at code and/or use `create_monitored_items method`.
73
    :param server: `InternalSession` or `UAClient`
74
    """
75
76
    def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler, loop=None):
77
        self.loop = loop or asyncio.get_event_loop()
78
        self.logger = logging.getLogger(__name__)
79
        self.server = server
80
        self._client_handle = 200
81
        self._handler: SubHandler = handler
82
        self.parameters: ua.CreateSubscriptionParameters = params  # move to data class
83
        self._monitored_items = {}
84
        self.subscription_id = None
85
86
    async def init(self):
87
        response = await self.server.create_subscription(self.parameters, callback=self.publish_callback)
88
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.logger.info('Subscription created %s', self.subscription_id)
90
91
    def publish_callback(self, publish_result: ua.PublishResult):
92
        """
93
        Handle `PublishResult` callback.
94
        """
95
        self.logger.info("Publish callback called with result: %s", publish_result)
96
        if publish_result.NotificationMessage.NotificationData is not None:
97
            for notif in publish_result.NotificationMessage.NotificationData:
98
                if isinstance(notif, ua.DataChangeNotification):
99
                    self._call_datachange(notif)
100
                elif isinstance(notif, ua.EventNotificationList):
101
                    self._call_event(notif)
102
                elif isinstance(notif, ua.StatusChangeNotification):
103
                    self._call_status(notif)
104
                else:
105
                    self.logger.warning("Notification type not supported yet for notification %s", notif)
106
        else:
107
            self.logger.warning("NotificationMessage is None.")
108
109
    async def delete(self):
110
        """
111
        Delete subscription on server. This is automatically done by Client and Server classes on exit.
112
        """
113
        results = await self.server.delete_subscriptions([self.subscription_id])
114
        results[0].check()
115
116
    def _call_datachange(self, datachange: ua.DataChangeNotification):
117
        for item in datachange.MonitoredItems:
118
            if item.ClientHandle not in self._monitored_items:
119
                self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
120
                continue
121
            data = self._monitored_items[item.ClientHandle]
122
            if hasattr(self._handler, "datachange_notification"):
123
                event_data = DataChangeNotif(data, item)
124
                try:
125
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
126
                except Exception:
127
                    self.logger.exception("Exception calling data change handler")
128
            else:
129
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
130
131
    def _call_event(self, eventlist: ua.EventNotificationList):
132
        for event in eventlist.Events:
133
            data = self._monitored_items[event.ClientHandle]
134
            result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
135
            result.server_handle = data.server_handle
136
            if hasattr(self._handler, "event_notification"):
137
                try:
138
                    self._handler.event_notification(result)
139
                except Exception:
140
                    self.logger.exception("Exception calling event handler")
141
            else:
142
                self.logger.error("Event subscription created but handler has no event_notification method")
143
144
    def _call_status(self, status: ua.StatusChangeNotification):
145
        try:
146
            self._handler.status_change_notification(status.Status)
147
        except Exception:
148
            self.logger.exception("Exception calling status change handler")
149
150
    async def subscribe_data_change(self,
151
                                    nodes: Union[Node, Iterable[Node]],
152
                                    attr=ua.AttributeIds.Value,
153
                                    queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
154
        """
155
        Subscribe for data change events for one ore multiple nodes.
156
        The default attribute used for the subscription is `Value`.
157
        Return value is a handle which can be used to modify/cancel the subscription.
158
        The handle is either an integer value for single Nodes. If the subscription failes an `UaStatusCodeError`
159
        will be raises.
160
        If multiple Nodes were supplied a List of integers or ua.StatusCode objects is returned. StatusCode objects
161
        are returned to indicate that the subscription has failed (no exception will be raises in this case).
162
        If more control is necessary the `create_monitored_items` method can be used directly.
163
164
        :param nodes: One Node or an Iterable of Nodes
165
        :param attr: The Node attribute you want to subscribe to
166
        :param queuesize: 0 or 1 for default queue size (shall be 1 - noe queuing), n for FIFO queue
167
        :return: Handle for changing/cancelling of the subscription
168
        """
169
        return await self._subscribe(nodes, attr, queuesize=queuesize)
170
171
    async def subscribe_events(self,
172
                               sourcenode: Node = ua.ObjectIds.Server,
173
                               evtypes=ua.ObjectIds.BaseEventType,
174
                               evfilter=None,
175
                               queuesize=0) -> int:
176
        """
177
        Subscribe to events from a node. Default node is Server node.
178
        In most servers the server node is the only one you can subscribe to.
179
        If evtypes is not provided, evtype defaults to BaseEventType.
180
        If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
181
        A handle (integer value) is returned which can be used to modify/cancel the subscription.
182
183
        :param sourcenode:
184
        :param evtypes:
185
        :param evfilter:
186
        :param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
187
        MaxUInt32 for max queue size
188
        :return: Handle for changing/cancelling of the subscription
189
        """
190
        sourcenode = Node(self.server, sourcenode)
191
        if evfilter is None:
192
            if not type(evtypes) in (list, tuple):
193
                evtypes = [evtypes]
194
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
195
            evfilter = await get_filter_from_event_type(evtypes)
196
        return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
197
198
    async def _subscribe(self, nodes: Union[Node, Iterable[Node]],
199
                         attr, mfilter=None, queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
200
        """
201
        Private low level method for subscribing.
202
        :param nodes: One Node or an Iterable og Nodes.
203
        :param attr: ua.AttributeId
204
        :param mfilter: MonitoringFilter
205
        :param queuesize: queue size
206
        :return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
207
        """
208
        is_list = True
209
        if isinstance(nodes, collections.Iterable):
210
            nodes = list(nodes)
211
        else:
212
            nodes = [nodes]
213
            is_list = False
214
        # Create List of MonitoredItemCreateRequest
215
        mirs = []
216
        for node in nodes:
217
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
218
            mirs.append(mir)
219
        # Await MonitoredItemCreateResult
220
        mids = await self.create_monitored_items(mirs)
221
        if is_list:
222
            # Return results for multiple nodes
223
            return mids
224
        # Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
225
        if type(mids[0]) == ua.StatusCode:
226
            mids[0].check()
227
        return mids[0]
228
229
    def _make_monitored_item_request(self, node: Node, attr, mfilter, queuesize) -> ua.MonitoredItemCreateRequest:
230
        rv = ua.ReadValueId()
231
        rv.NodeId = node.nodeid
232
        rv.AttributeId = attr
233
        # rv.IndexRange //We leave it null, then the entire array is returned
234
        mparams = ua.MonitoringParameters()
235
        self._client_handle += 1
236
        mparams.ClientHandle = self._client_handle
237
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
238
        mparams.QueueSize = queuesize
239
        mparams.DiscardOldest = True
240
        if mfilter:
241
            mparams.Filter = mfilter
242
        mir = ua.MonitoredItemCreateRequest()
243
        mir.ItemToMonitor = rv
244
        mir.MonitoringMode = ua.MonitoringMode.Reporting
245
        mir.RequestedParameters = mparams
246
        return mir
247
248
    async def create_monitored_items(self, monitored_items) -> List[Union[int, ua.StatusCode]]:
249
        """
250
        low level method to have full control over subscription parameters.
251
        Client handle must be unique since it will be used as key for internal registration of data.
252
        """
253
        params = ua.CreateMonitoredItemsParameters()
254
        params.SubscriptionId = self.subscription_id
255
        params.ItemsToCreate = monitored_items
256
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
257
        # insert monitored item into map to avoid notification arrive before result return
258
        # server_handle is left as None in purpose as we don't get it yet.
259
        for mi in monitored_items:
260
            data = SubscriptionItemData()
261
            data.client_handle = mi.RequestedParameters.ClientHandle
262
            data.node = Node(self.server, mi.ItemToMonitor.NodeId)
263
            data.attribute = mi.ItemToMonitor.AttributeId
264
            # TODO: Either use the filter from request or from response.
265
            #  Here it uses from request, in modify it uses from response
266
            data.mfilter = mi.RequestedParameters.Filter
267
            self._monitored_items[mi.RequestedParameters.ClientHandle] = data
268
        results = await self.server.create_monitored_items(params)
269
        mids = []
270
        # process result, add server_handle, or remove it if failed
271
        for idx, result in enumerate(results):
272
            mi = params.ItemsToCreate[idx]
273
            if not result.StatusCode.is_good():
274
                del self._monitored_items[mi.RequestedParameters.ClientHandle]
275
                mids.append(result.StatusCode)
276
                continue
277
            data = self._monitored_items[mi.RequestedParameters.ClientHandle]
278
            data.server_handle = result.MonitoredItemId
279
            mids.append(result.MonitoredItemId)
280
        return mids
281
282
    async def unsubscribe(self, handle: Union[int, List[int]]):
283
        """
284
        Unsubscribe from datachange or events using the handle returned while subscribing.
285
        If you delete the subscription, you do not need to unsubscribe.
286
        :param handle: The handle that was returned when subscribing to the node/nodes
287
        """
288
        handles = [handle] if type(handle) is int else handle
289
        params = ua.DeleteMonitoredItemsParameters()
290
        params.SubscriptionId = self.subscription_id
291
        params.MonitoredItemIds = handles
292
        results = await self.server.delete_monitored_items(params)
293
        results[0].check()
294
        for k, v in self._monitored_items.items():
295
            if v.server_handle in handles:
296
                del (self._monitored_items[k])
297
                return
298
299
    async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=0, mod_filter_val=-1):
300
        """
301
        Modify a monitored item.
302
        :param handle: Handle returned when originally subscribing
303
        :param new_samp_time: New wanted sample time
304
        :param new_queuesize: New wanted queuesize, default is 0
305
        :param mod_filter_val: New deadband filter value
306
        :return: Return a Modify Monitored Item Result
307
        """
308
        # Find the monitored item in the monitored item registry.
309
        item_to_change = next(item for item in self._monitored_items.values() if item.server_handle == handle)
310
        if not item_to_change:
311
            raise ValueError('The monitored item was not found.')
312
        if mod_filter_val is None:
313
            mod_filter = None
314
        elif mod_filter_val < 0:
315
            mod_filter = item_to_change.mfilter
316
        else:
317
            mod_filter = ua.DataChangeFilter()
318
            # send notification when status or value change
319
            mod_filter.Trigger = ua.DataChangeTrigger(1)
320
            mod_filter.DeadbandType = 1
321
            # absolute float value or from 0 to 100 for percentage deadband
322
            mod_filter.DeadbandValue = mod_filter_val
323
        modif_item = ua.MonitoredItemModifyRequest()
324
        modif_item.MonitoredItemId = handle
325
        modif_item.RequestedParameters = self._modify_monitored_item_request(
326
            new_queuesize, new_samp_time, mod_filter, item_to_change.client_handle
327
        )
328
        params = ua.ModifyMonitoredItemsParameters()
329
        params.SubscriptionId = self.subscription_id
330
        params.ItemsToModify.append(modif_item)
331
        results = await self.server.modify_monitored_items(params)
332
        item_to_change.mfilter = results[0].FilterResult
333
        return results
334
335
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
336
        req_params = ua.MonitoringParameters()
337
        req_params.ClientHandle = client_handle
338
        req_params.QueueSize = new_queuesize
339
        req_params.Filter = mod_filter
340
        req_params.SamplingInterval = new_samp_time
341
        return req_params
342
343
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
344
        """
345
        Method to create a subscription with a Deadband Value.
346
        Default deadband value type is absolute.
347
        Return a handle which can be used to unsubscribe
348
        :param var: Variable to which you want to subscribe
349
        :param deadband_val: Absolute float value
350
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
351
        :param queuesize: Wanted queue size, default is 1
352
        :param attr: Attribute ID
353
        """
354
        deadband_filter = ua.DataChangeFilter()
355
        # send notification when status or value change
356
        deadband_filter.Trigger = ua.DataChangeTrigger(1)
357
        deadband_filter.DeadbandType = deadbandtype
358
        # absolute float value or from 0 to 100 for percentage deadband
359
        deadband_filter.DeadbandValue = deadband_val
360
        return self._subscribe(var, attr, deadband_filter, queuesize)
361