Passed
Pull Request — master (#45)
by
unknown
01:53
created

Subscription._call_datachange()   A

Complexity

Conditions 5

Size

Total Lines 14
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 13
nop 2
dl 0
loc 14
rs 9.2833
c 0
b 0
f 0
1
"""
2
high level interface to subscriptions
3
"""
4
import asyncio
5
import logging
6
import collections
7
from typing import Union, List, Iterable
8
9
from asyncua import ua
10
from .events import Event, get_filter_from_event_type
11
from .node import Node
12
13
14
class SubHandler:
15
    """
16
    Subscription Handler. To receive events from server for a subscription
17
    This class is just a sample class. Whatever class having these methods can be used
18
    """
19
20
    def datachange_notification(self, node, val, data):
21
        """
22
        called for every datachange notification from server
23
        """
24
        pass
25
26
    def event_notification(self, event):
27
        """
28
        called for every event notification from server
29
        """
30
        pass
31
32
    def status_change_notification(self, status):
33
        """
34
        called for every status change notification from server
35
        """
36
        pass
37
38
39
class SubscriptionItemData:
40
    """
41
    To store useful data from a monitored item.
42
    """
43
44
    def __init__(self):
45
        self.node = None
46
        self.client_handle = None
47
        self.server_handle = None
48
        self.attribute = None
49
        self.mfilter = None
50
51
52
class DataChangeNotif:
53
    """
54
    To be send to clients for every datachange notification from server.
55
    """
56
57
    def __init__(self, subscription_data, monitored_item):
58
        self.monitored_item = monitored_item
59
        self.subscription_data = subscription_data
60
61
    def __str__(self):
62
        return f"DataChangeNotification({self.subscription_data}, {self.monitored_item})"
63
64
    __repr__ = __str__
65
66
67
class Subscription:
68
    """
69
    Subscription object returned by Server or Client objects.
70
    The object represent a subscription to an opc-ua server.
71
    This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`.
72
    If more control is necessary look at code and/or use `create_monitored_items method`.
73
    :param server: `InternalSession` or `UAClient`
74
    """
75
76
    def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler, loop=None):
77
        self.loop = loop or asyncio.get_event_loop()
78
        self.logger = logging.getLogger(__name__)
79
        self.server = server
80
        self._client_handle = 200
81
        self._handler: SubHandler = handler
82
        self.parameters: ua.CreateSubscriptionParameters = params  # move to data class
83
        self._monitored_items = {}
84
        self.subscription_id = None
85
86
    async def init(self):
87
        response = await self.server.create_subscription(self.parameters, callback=self.publish_callback)
88
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.logger.info('Subscription created %s', self.subscription_id)
90
91
    def publish_callback(self, publish_result: ua.PublishResult):
92
        """
93
        Handle `PublishResult` callback.
94
        """
95
        self.logger.info("Publish callback called with result: %s", publish_result)
96
        if publish_result.NotificationMessage.NotificationData is not None:
97
            for notif in publish_result.NotificationMessage.NotificationData:
98
                if isinstance(notif, ua.DataChangeNotification):
99
                    self._call_datachange(notif)
100
                elif isinstance(notif, ua.EventNotificationList):
101
                    self._call_event(notif)
102
                elif isinstance(notif, ua.StatusChangeNotification):
103
                    self._call_status(notif)
104
                else:
105
                    self.logger.warning("Notification type not supported yet for notification %s", notif)
106
        else:
107
            self.logger.warning("NotificationMessage is None.")
108
109
    async def delete(self):
110
        """
111
        Delete subscription on server. This is automatically done by Client and Server classes on exit.
112
        """
113
        results = await self.server.delete_subscriptions([self.subscription_id])
114
        results[0].check()
115
116
    def _call_datachange(self, datachange: ua.DataChangeNotification):
117
        for item in datachange.MonitoredItems:
118
            if item.ClientHandle not in self._monitored_items:
119
                self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
120
                continue
121
            data = self._monitored_items[item.ClientHandle]
122
            if hasattr(self._handler, "datachange_notification"):
123
                event_data = DataChangeNotif(data, item)
124
                try:
125
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
126
                except Exception:
127
                    self.logger.exception("Exception calling data change handler")
128
            else:
129
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
130
131
    def _call_event(self, eventlist: ua.EventNotificationList):
132
        for event in eventlist.Events:
133
            data = self._monitored_items[event.ClientHandle]
134
            result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
135
            result.server_handle = data.server_handle
136
            if hasattr(self._handler, "event_notification"):
137
                try:
138
                    self._handler.event_notification(result)
139
                except Exception:
140
                    self.logger.exception("Exception calling event handler")
141
            else:
142
                self.logger.error("Event subscription created but handler has no event_notification method")
143
144
    def _call_status(self, status: ua.StatusChangeNotification):
145
        try:
146
            self._handler.status_change_notification(status.Status)
147
        except Exception:
148
            self.logger.exception("Exception calling status change handler")
149
150
    async def subscribe_data_change(self,
151
                                    nodes: Union[Node, Iterable[Node]],
152
                                    attr=ua.AttributeIds.Value,
153
                                    queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
154
        """
155
        Subscribe to data change events of one or multiple nodes.
156
        The default attribute used for the subscription is `Value`.
157
        Return value is a handle which can be used to modify/cancel the subscription.
158
        The handle is an integer value for single Nodes. If the creation of the subscription fails an
159
        `UaStatusCodeError` is raised.
160
        If multiple Nodes are supplied, a List of integers or ua.StatusCode objects is returned. A list of
161
        StatusCode objects are returned to indicate that the subscription has failed (no exception will be
162
        raised in this case).
163
        If more control is necessary the `create_monitored_items` method can be used directly.
164
165
        :param nodes: One Node or an Iterable of Nodes
166
        :param attr: The Node attribute you want to subscribe to
167
        :param queuesize: 0 or 1 for default queue size (shall be 1 - no queuing), n for FIFO queue
168
        :return: Handle for changing/cancelling of the subscription
169
        """
170
        return await self._subscribe(nodes, attr, queuesize=queuesize)
171
172
    async def subscribe_events(self,
173
                               sourcenode: Node = ua.ObjectIds.Server,
174
                               evtypes=ua.ObjectIds.BaseEventType,
175
                               evfilter=None,
176
                               queuesize=0) -> int:
177
        """
178
        Subscribe to events from a node. Default node is Server node.
179
        In most servers the server node is the only one you can subscribe to.
180
        If evtypes is not provided, evtype defaults to BaseEventType.
181
        If evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types.
182
        A handle (integer value) is returned which can be used to modify/cancel the subscription.
183
184
        :param sourcenode:
185
        :param evtypes:
186
        :param evfilter:
187
        :param queuesize: 0 for default queue size, 1 for minimum queue size, n for FIFO queue,
188
        MaxUInt32 for max queue size
189
        :return: Handle for changing/cancelling of the subscription
190
        """
191
        sourcenode = Node(self.server, sourcenode)
192
        if evfilter is None:
193
            if not type(evtypes) in (list, tuple):
194
                evtypes = [evtypes]
195
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
196
            evfilter = await get_filter_from_event_type(evtypes)
197
        return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
198
199
    async def _subscribe(self, nodes: Union[Node, Iterable[Node]],
200
                         attr, mfilter=None, queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
201
        """
202
        Private low level method for subscribing.
203
        :param nodes: One Node or an Iterable og Nodes.
204
        :param attr: ua.AttributeId
205
        :param mfilter: MonitoringFilter
206
        :param queuesize: queue size
207
        :return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
208
        """
209
        is_list = True
210
        if isinstance(nodes, collections.Iterable):
211
            nodes = list(nodes)
212
        else:
213
            nodes = [nodes]
214
            is_list = False
215
        # Create List of MonitoredItemCreateRequest
216
        mirs = []
217
        for node in nodes:
218
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
219
            mirs.append(mir)
220
        # Await MonitoredItemCreateResult
221
        mids = await self.create_monitored_items(mirs)
222
        if is_list:
223
            # Return results for multiple nodes
224
            return mids
225
        # Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
226
        if type(mids[0]) == ua.StatusCode:
227
            mids[0].check()
228
        return mids[0]
229
230
    def _make_monitored_item_request(self, node: Node, attr, mfilter, queuesize) -> ua.MonitoredItemCreateRequest:
231
        rv = ua.ReadValueId()
232
        rv.NodeId = node.nodeid
233
        rv.AttributeId = attr
234
        # rv.IndexRange //We leave it null, then the entire array is returned
235
        mparams = ua.MonitoringParameters()
236
        self._client_handle += 1
237
        mparams.ClientHandle = self._client_handle
238
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
239
        mparams.QueueSize = queuesize
240
        mparams.DiscardOldest = True
241
        if mfilter:
242
            mparams.Filter = mfilter
243
        mir = ua.MonitoredItemCreateRequest()
244
        mir.ItemToMonitor = rv
245
        mir.MonitoringMode = ua.MonitoringMode.Reporting
246
        mir.RequestedParameters = mparams
247
        return mir
248
249
    async def create_monitored_items(self, monitored_items) -> List[Union[int, ua.StatusCode]]:
250
        """
251
        low level method to have full control over subscription parameters.
252
        Client handle must be unique since it will be used as key for internal registration of data.
253
        """
254
        params = ua.CreateMonitoredItemsParameters()
255
        params.SubscriptionId = self.subscription_id
256
        params.ItemsToCreate = monitored_items
257
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
258
        # insert monitored item into map to avoid notification arrive before result return
259
        # server_handle is left as None in purpose as we don't get it yet.
260
        for mi in monitored_items:
261
            data = SubscriptionItemData()
262
            data.client_handle = mi.RequestedParameters.ClientHandle
263
            data.node = Node(self.server, mi.ItemToMonitor.NodeId)
264
            data.attribute = mi.ItemToMonitor.AttributeId
265
            # TODO: Either use the filter from request or from response.
266
            #  Here it uses from request, in modify it uses from response
267
            data.mfilter = mi.RequestedParameters.Filter
268
            self._monitored_items[mi.RequestedParameters.ClientHandle] = data
269
        results = await self.server.create_monitored_items(params)
270
        mids = []
271
        # process result, add server_handle, or remove it if failed
272
        for idx, result in enumerate(results):
273
            mi = params.ItemsToCreate[idx]
274
            if not result.StatusCode.is_good():
275
                del self._monitored_items[mi.RequestedParameters.ClientHandle]
276
                mids.append(result.StatusCode)
277
                continue
278
            data = self._monitored_items[mi.RequestedParameters.ClientHandle]
279
            data.server_handle = result.MonitoredItemId
280
            mids.append(result.MonitoredItemId)
281
        return mids
282
283
    async def unsubscribe(self, handle: Union[int, List[int]]):
284
        """
285
        Unsubscribe from datachange or events using the handle returned while subscribing.
286
        If you delete the subscription, you do not need to unsubscribe.
287
        :param handle: The handle that was returned when subscribing to the node/nodes
288
        """
289
        handles = [handle] if type(handle) is int else handle
290
        params = ua.DeleteMonitoredItemsParameters()
291
        params.SubscriptionId = self.subscription_id
292
        params.MonitoredItemIds = handles
293
        results = await self.server.delete_monitored_items(params)
294
        results[0].check()
295
        for k, v in self._monitored_items.items():
296
            if v.server_handle in handles:
297
                del (self._monitored_items[k])
298
                return
299
300
    async def modify_monitored_item(self, handle: int, new_samp_time, new_queuesize=0, mod_filter_val=-1):
301
        """
302
        Modify a monitored item.
303
        :param handle: Handle returned when originally subscribing
304
        :param new_samp_time: New wanted sample time
305
        :param new_queuesize: New wanted queuesize, default is 0
306
        :param mod_filter_val: New deadband filter value
307
        :return: Return a Modify Monitored Item Result
308
        """
309
        # Find the monitored item in the monitored item registry.
310
        item_to_change = next(item for item in self._monitored_items.values() if item.server_handle == handle)
311
        if not item_to_change:
312
            raise ValueError('The monitored item was not found.')
313
        if mod_filter_val is None:
314
            mod_filter = None
315
        elif mod_filter_val < 0:
316
            mod_filter = item_to_change.mfilter
317
        else:
318
            mod_filter = ua.DataChangeFilter()
319
            # send notification when status or value change
320
            mod_filter.Trigger = ua.DataChangeTrigger(1)
321
            mod_filter.DeadbandType = 1
322
            # absolute float value or from 0 to 100 for percentage deadband
323
            mod_filter.DeadbandValue = mod_filter_val
324
        modif_item = ua.MonitoredItemModifyRequest()
325
        modif_item.MonitoredItemId = handle
326
        modif_item.RequestedParameters = self._modify_monitored_item_request(
327
            new_queuesize, new_samp_time, mod_filter, item_to_change.client_handle
328
        )
329
        params = ua.ModifyMonitoredItemsParameters()
330
        params.SubscriptionId = self.subscription_id
331
        params.ItemsToModify.append(modif_item)
332
        results = await self.server.modify_monitored_items(params)
333
        item_to_change.mfilter = results[0].FilterResult
334
        return results
335
336
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
337
        req_params = ua.MonitoringParameters()
338
        req_params.ClientHandle = client_handle
339
        req_params.QueueSize = new_queuesize
340
        req_params.Filter = mod_filter
341
        req_params.SamplingInterval = new_samp_time
342
        return req_params
343
344
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
345
        """
346
        Method to create a subscription with a Deadband Value.
347
        Default deadband value type is absolute.
348
        Return a handle which can be used to unsubscribe
349
        :param var: Variable to which you want to subscribe
350
        :param deadband_val: Absolute float value
351
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
352
        :param queuesize: Wanted queue size, default is 1
353
        :param attr: Attribute ID
354
        """
355
        deadband_filter = ua.DataChangeFilter()
356
        # send notification when status or value change
357
        deadband_filter.Trigger = ua.DataChangeTrigger(1)
358
        deadband_filter.DeadbandType = deadbandtype
359
        # absolute float value or from 0 to 100 for percentage deadband
360
        deadband_filter.DeadbandValue = deadband_val
361
        return self._subscribe(var, attr, deadband_filter, queuesize)
362