Completed
Push — master ( 9925e7...6c504f )
by Olivier
02:21
created

Subscription.set_monitoring_mode()   A

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 18
rs 9.95
c 0
b 0
f 0
1
"""
2
high level interface to subscriptions
3
"""
4
import asyncio
5
import logging
6
import collections.abc
7
from typing import Union, List, Iterable
8
9
from asyncua import ua
10
from .events import Event, get_filter_from_event_type
11
from .node import Node
12
13
14
class SubHandler:
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20
    def datachange_notification(self, node, val, data):
21
        """
22
        called for every datachange notification from server
23
        """
24
        pass
25
26
    def event_notification(self, event):
27
        """
28
        called for every event notification from server
29
        """
30
        pass
31
32
    def status_change_notification(self, status):
33
        """
34
        called for every status change notification from server
35
        """
36
        pass
37
38
39
class SubscriptionItemData:
40
    """
41
    To store useful data from a monitored item.
42
    """
43
44
    def __init__(self):
45
        self.node = None
46
        self.client_handle = None
47
        self.server_handle = None
48
        self.attribute = None
49
        self.mfilter = None
50
51
52
class DataChangeNotif:
53
    """
54
    To be send to clients for every datachange notification from server.
55
    """
56
57
    def __init__(self, subscription_data, monitored_item):
58
        self.monitored_item = monitored_item
59
        self.subscription_data = subscription_data
60
61
    def __str__(self):
62
        return f"DataChangeNotification({self.subscription_data}, {self.monitored_item})"
63
64
    __repr__ = __str__
65
66
67
class Subscription:
68
    """
69
    Subscription object returned by Server or Client objects.
70
    The object represent a subscription to an opc-ua server.
71
    This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`.
72
    If more control is necessary look at code and/or use `create_monitored_items method`.
73
    :param server: `InternalSession` or `UAClient`
74
    """
75
76
    def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler, loop=None):
77
        self.loop = loop or asyncio.get_event_loop()
78
        self.logger = logging.getLogger(__name__)
79
        self.server = server
80
        self._client_handle = 200
81
        self._handler: SubHandler = handler
82
        self.parameters: ua.CreateSubscriptionParameters = params  # move to data class
83
        self._monitored_items = {}
84
        self.subscription_id = None
85
86
    async def init(self):
87
        response = await self.server.create_subscription(self.parameters, callback=self.publish_callback)
88
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.logger.info('Subscription created %s', self.subscription_id)
90
91
    async def publish_callback(self, publish_result: ua.PublishResult):
92
        """
93
        Handle `PublishResult` callback.
94
        """
95
        self.logger.info("Publish callback called with result: %s", publish_result)
96
        if publish_result.NotificationMessage.NotificationData is not None:
97
            for notif in publish_result.NotificationMessage.NotificationData:
98
                if isinstance(notif, ua.DataChangeNotification):
99
                    await self._call_datachange(notif)
100
                elif isinstance(notif, ua.EventNotificationList):
101
                    await self._call_event(notif)
102
                elif isinstance(notif, ua.StatusChangeNotification):
103
                    await self._call_status(notif)
104
                else:
105
                    self.logger.warning("Notification type not supported yet for notification %s", notif)
106
        else:
107
            self.logger.warning("NotificationMessage is None.")
108
109
    async def delete(self):
110
        """
111
        Delete subscription on server. This is automatically done by Client and Server classes on exit.
112
        """
113
        results = await self.server.delete_subscriptions([self.subscription_id])
114
        results[0].check()
115
116
    async def _call_datachange(self, datachange: ua.DataChangeNotification):
117
        for item in datachange.MonitoredItems:
118
            if item.ClientHandle not in self._monitored_items:
119
                self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
120
                continue
121
            data = self._monitored_items[item.ClientHandle]
122
            if hasattr(self._handler, "datachange_notification"):
123
                event_data = DataChangeNotif(data, item)
124
                try:
125
                    if asyncio.iscoroutinefunction(self._handler.datachange_notification):
126
                        await self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
127
                    else:
128
                        self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
129
                except Exception:
130
                    self.logger.exception("Exception calling data change handler")
131
            else:
132
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
133
134
    async def _call_event(self, eventlist: ua.EventNotificationList):
135
        for event in eventlist.Events:
136
            data = self._monitored_items[event.ClientHandle]
137
            result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
138
            result.server_handle = data.server_handle
139
            if hasattr(self._handler, "event_notification"):
140
                try:
141
                    if asyncio.iscoroutinefunction(self._handler.event_notification):
142
                        await self._handler.event_notification(result)
143
                    else:
144
                        self._handler.event_notification(result)
145
                except Exception:
146
                    self.logger.exception("Exception calling event handler")
147
            else:
148
                self.logger.error("Event subscription created but handler has no event_notification method")
149
150
    async def _call_status(self, status: ua.StatusChangeNotification):
151
        try:
152
            if asyncio.iscoroutinefunction(self._handler.status_change_notification):
153
                await self._handler.status_change_notification(status.Status)
154
            else:
155
                self._handler.status_change_notification(status.Status)
156
        except Exception:
157
            self.logger.exception("Exception calling status change handler")
158
159
    async def subscribe_data_change(self,
160
                                    nodes: Union[Node, Iterable[Node]],
161
                                    attr=ua.AttributeIds.Value,
162
                                    queuesize=0,
163
                                    monitoring=ua.MonitoringMode.Reporting,
164
                                   ) -> Union[int, List[Union[int, ua.StatusCode]]]:
165
        """
166
        Subscribe to data change events of one or multiple nodes.
167
        The default attribute used for the subscription is `Value`.
168
        Return value is a handle which can be used to modify/cancel the subscription.
169
        The handle is an integer value for single Nodes. If the creation of the subscription fails an
170
        `UaStatusCodeError` is raised.
171
        If multiple Nodes are supplied, a List of integers or ua.StatusCode objects is returned. A list of
172
        StatusCode objects are returned to indicate that the subscription has failed (no exception will be
173
        raised in this case).
174
        If more control is necessary the `create_monitored_items` method can be used directly.
175
176
        :param nodes: One Node or an Iterable of Nodes
177
        :param attr: The Node attribute you want to subscribe to
178
        :param queuesize: 0 or 1 for default queue size (shall be 1 - no queuing), n for FIFO queue
179
        :return: Handle for changing/cancelling of the subscription
180
        """
181
        return await self._subscribe(
182
            nodes, attr, queuesize=queuesize, monitoring=monitoring
183
        )
184
185
    async def subscribe_events(self,
186
                               sourcenode: Node = ua.ObjectIds.Server,
187
                               evtypes=ua.ObjectIds.BaseEventType,
188
                               evfilter=None,
189
                               queuesize=0) -> int:
190
        """
191
        Subscribe to events from a node. Default node is Server node.
192
        In most servers the server node is the only one you can subscribe to.
193
        If evtypes is not provided, evtype defaults to BaseEventType.
194
        If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
195
        A handle (integer value) is returned which can be used to modify/cancel the subscription.
196
197
        :param sourcenode:
198
        :param evtypes:
199
        :param evfilter:
200
        :param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
201
        MaxUInt32 for max queue size
202
        :return: Handle for changing/cancelling of the subscription
203
        """
204
        sourcenode = Node(self.server, sourcenode)
205
        if evfilter is None:
206
            if not type(evtypes) in (list, tuple):
207
                evtypes = [evtypes]
208
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
209
            evfilter = await get_filter_from_event_type(evtypes)
210
        return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
211
212
    async def _subscribe(self,
213
                         nodes: Union[Node, Iterable[Node]],
214
                         attr=ua.AttributeIds.Value,
215
                         mfilter=None,
216
                         queuesize=0,
217
                         monitoring=ua.MonitoringMode.Reporting,
218
                        ) -> Union[int, List[Union[int, ua.StatusCode]]]:
219
        """
220
        Private low level method for subscribing.
221
        :param nodes: One Node or an Iterable og Nodes.
222
        :param attr: ua.AttributeId
223
        :param mfilter: MonitoringFilter
224
        :param queuesize: queue size
225
        :param monitoring: ua.MonitoringMode
226
        :return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
227
        """
228
        is_list = True
229
        if isinstance(nodes, collections.abc.Iterable):
230
            nodes = list(nodes)
231
        else:
232
            nodes = [nodes]
233
            is_list = False
234
        # Create List of MonitoredItemCreateRequest
235
        mirs = []
236
        for node in nodes:
237
            mir = self._make_monitored_item_request(
238
                node, attr, mfilter, queuesize, monitoring
239
            )
240
            mirs.append(mir)
241
        # Await MonitoredItemCreateResult
242
        mids = await self.create_monitored_items(mirs)
243
        if is_list:
244
            # Return results for multiple nodes
245
            return mids
246
        # Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
247
        if type(mids[0]) == ua.StatusCode:
248
            mids[0].check()
249
        return mids[0]
250
251
    def _make_monitored_item_request(self,
252
                                     node: Node,
253
                                     attr,
254
                                     mfilter,
255
                                     queuesize,
256
                                     monitoring) -> ua.MonitoredItemCreateRequest:
257
        rv = ua.ReadValueId()
258
        rv.NodeId = node.nodeid
259
        rv.AttributeId = attr
260
        # rv.IndexRange //We leave it null, then the entire array is returned
261
        mparams = ua.MonitoringParameters()
262
        self._client_handle += 1
263
        mparams.ClientHandle = self._client_handle
264
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
265
        mparams.QueueSize = queuesize
266
        mparams.DiscardOldest = True
267
        if mfilter:
268
            mparams.Filter = mfilter
269
        mir = ua.MonitoredItemCreateRequest()
270
        mir.ItemToMonitor = rv
271
        mir.MonitoringMode = monitoring
272
        mir.RequestedParameters = mparams
273
        return mir
274
275
    async def create_monitored_items(self, monitored_items) -> List[Union[int, ua.StatusCode]]:
276
        """
277
        low level method to have full control over subscription parameters.
278
        Client handle must be unique since it will be used as key for internal registration of data.
279
        """
280
        params = ua.CreateMonitoredItemsParameters()
281
        params.SubscriptionId = self.subscription_id
282
        params.ItemsToCreate = monitored_items
283
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
284
        # insert monitored item into map to avoid notification arrive before result return
285
        # server_handle is left as None in purpose as we don't get it yet.
286
        for mi in monitored_items:
287
            data = SubscriptionItemData()
288
            data.client_handle = mi.RequestedParameters.ClientHandle
289
            data.node = Node(self.server, mi.ItemToMonitor.NodeId)
290
            data.attribute = mi.ItemToMonitor.AttributeId
291
            # TODO: Either use the filter from request or from response.
292
            #  Here it uses from request, in modify it uses from response
293
            data.mfilter = mi.RequestedParameters.Filter
294
            self._monitored_items[mi.RequestedParameters.ClientHandle] = data
295
        results = await self.server.create_monitored_items(params)
296
        mids = []
297
        # process result, add server_handle, or remove it if failed
298
        for idx, result in enumerate(results):
299
            mi = params.ItemsToCreate[idx]
300
            if not result.StatusCode.is_good():
301
                del self._monitored_items[mi.RequestedParameters.ClientHandle]
302
                mids.append(result.StatusCode)
303
                continue
304
            data = self._monitored_items[mi.RequestedParameters.ClientHandle]
305
            data.server_handle = result.MonitoredItemId
306
            mids.append(result.MonitoredItemId)
307
        return mids
308
309
    async def unsubscribe(self, handle: Union[int, List[int]]):
310
        """
311
        Unsubscribe from datachange or events using the handle returned while subscribing.
312
        If you delete the subscription, you do not need to unsubscribe.
313
        :param handle: The handle that was returned when subscribing to the node/nodes
314
        """
315
        handles = [handle] if type(handle) is int else handle
316
        if not handles:
317
            return
318
        params = ua.DeleteMonitoredItemsParameters()
319
        params.SubscriptionId = self.subscription_id
320
        params.MonitoredItemIds = handles
321
        results = await self.server.delete_monitored_items(params)
322
        results[0].check()
323
        handle_map = {v.server_handle: k for k, v in self._monitored_items.items()}
324
        for handle in handles:
325
            if handle in handle_map:
326
                del self._monitored_items[handle_map[handle]]
327
328
    async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=0, mod_filter_val=-1):
329
        """
330
        Modify a monitored item.
331
        :param handle: Handle returned when originally subscribing
332
        :param new_samp_time: New wanted sample time
333
        :param new_queuesize: New wanted queuesize, default is 0
334
        :param mod_filter_val: New deadband filter value
335
        :return: Return a Modify Monitored Item Result
336
        """
337
        # Find the monitored item in the monitored item registry.
338
        item_to_change = next(item for item in self._monitored_items.values() if item.server_handle == handle)
339
        if not item_to_change:
340
            raise ValueError('The monitored item was not found.')
341
        if mod_filter_val is None:
342
            mod_filter = None
343
        elif mod_filter_val < 0:
344
            mod_filter = item_to_change.mfilter
345
        else:
346
            mod_filter = ua.DataChangeFilter()
347
            # send notification when status or value change
348
            mod_filter.Trigger = ua.DataChangeTrigger(1)
349
            mod_filter.DeadbandType = 1
350
            # absolute float value or from 0 to 100 for percentage deadband
351
            mod_filter.DeadbandValue = mod_filter_val
352
        modif_item = ua.MonitoredItemModifyRequest()
353
        modif_item.MonitoredItemId = handle
354
        modif_item.RequestedParameters = self._modify_monitored_item_request(
355
            new_queuesize, new_samp_time, mod_filter, item_to_change.client_handle
356
        )
357
        params = ua.ModifyMonitoredItemsParameters()
358
        params.SubscriptionId = self.subscription_id
359
        params.ItemsToModify.append(modif_item)
360
        results = await self.server.modify_monitored_items(params)
361
        item_to_change.mfilter = results[0].FilterResult
362
        return results
363
364
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
365
        req_params = ua.MonitoringParameters()
366
        req_params.ClientHandle = client_handle
367
        req_params.QueueSize = new_queuesize
368
        req_params.Filter = mod_filter
369
        req_params.SamplingInterval = new_samp_time
370
        return req_params
371
372
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
373
        """
374
        Method to create a subscription with a Deadband Value.
375
        Default deadband value type is absolute.
376
        Return a handle which can be used to unsubscribe
377
        :param var: Variable to which you want to subscribe
378
        :param deadband_val: Absolute float value
379
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
380
        :param queuesize: Wanted queue size, default is 1
381
        :param attr: Attribute ID
382
        """
383
        deadband_filter = ua.DataChangeFilter()
384
        # send notification when status or value change
385
        deadband_filter.Trigger = ua.DataChangeTrigger(1)
386
        deadband_filter.DeadbandType = deadbandtype
387
        # absolute float value or from 0 to 100 for percentage deadband
388
        deadband_filter.DeadbandValue = deadband_val
389
        return self._subscribe(var, attr, deadband_filter, queuesize)
390
391
    async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> ua.uatypes.StatusCode:
392
        """
393
        The monitoring mode parameter is used
394
        to enable/disable the sampling of MonitoredItems
395
        (Samples don't queue on the server side)
396
397
        :param monitoring: The monitoring mode to apply
398
        :return: Return a Set Monitoring Mode Result
399
        """
400
        node_handles = []
401
        for mi in self._monitored_items.values():
402
            node_handles.append(mi.server_handle)
403
404
        params = ua.SetMonitoringModeParameters()
405
        params.SubscriptionId = self.subscription_id
406
        params.MonitoredItemIds = node_handles
407
        params.MonitoringMode = monitoring
408
        return await self.server.set_monitoring_mode(params)
409
410
    async def set_publishing_mode(self, publishing: bool) -> ua.uatypes.StatusCode:
411
        """
412
        Disable publishing of NotificationMessages for the subscription,
413
        but doesn't discontinue the sending of keep-alive Messages,
414
        nor change the monitoring mode.
415
416
        :param publishing: The publishing mode to apply
417
        :return: Return a Set Publishing Mode Result
418
        """
419
        self.logger.info("set_publishing_mode")
420
        params = ua.SetPublishingModeParameters()
421
        params.SubscriptionIds = [self.subscription_id]
422
        params.PublishingEnabled = publishing
423
        result = await self.server.set_publishing_mode(params)
424
        if result[0].is_good():
425
            self.parameters.PublishingEnabled = publishing
426
        return result
427