Passed
Push — master ( d285f3...459165 )
by Olivier
02:30
created

asyncua.common.subscription   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 341
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 219
dl 0
loc 341
rs 5.5199
c 0
b 0
f 0
wmc 56

23 Methods

Rating   Name   Duplication   Size   Complexity  
A SubHandler.event_notification() 0 5 1
A Subscription._make_monitored_item_request() 0 18 2
B Subscription.publish_callback() 0 22 7
A Subscription.subscribe_events() 0 16 3
A Subscription.create_monitored_items() 0 32 4
A SubHandler.data_change() 0 5 1
A Subscription._subscribe() 0 17 5
B Subscription._call_datachange() 0 20 7
A Subscription.subscribe_data_change() 0 9 1
A Subscription.__init__() 0 8 1
A DataChangeNotif.__init__() 0 3 1
A Subscription.unsubscribe() 0 14 3
A SubscriptionItemData.__init__() 0 6 1
A Subscription._call_status() 0 5 2
A SubHandler.datachange_notification() 0 5 1
A SubHandler.status_change_notification() 0 5 1
B Subscription._call_event() 0 17 6
A DataChangeNotif.__str__() 0 2 1
A Subscription.init() 0 7 1
A Subscription.delete() 0 6 1
A Subscription._modify_monitored_item_request() 0 7 1
A Subscription.deadband_monitor() 0 16 1
A Subscription.modify_monitored_item() 0 33 4

How to fix   Complexity   

Complexity

Complex classes like asyncua.common.subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
high level interface to subscriptions
3
"""
4
import asyncio
5
import logging
6
import collections
7
8
from asyncua import ua
9
from .events import Event, get_filter_from_event_type
10
from .node import Node
11
12
13
class SubHandler:
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18
19
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24
25
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notification from server
28
        """
29
        pass
30
31
    def event_notification(self, event):
32
        """
33
        called for every event notification from server
34
        """
35
        pass
36
37
    def status_change_notification(self, status):
38
        """
39
        called for every status change notification from server
40
        """
41
        pass
42
43
44
class SubscriptionItemData:
45
    """
46
    To store useful data from a monitored item
47
    """
48
    def __init__(self):
49
        self.node = None
50
        self.client_handle = None
51
        self.server_handle = None
52
        self.attribute = None
53
        self.mfilter = None
54
55
56
class DataChangeNotif:
57
    """
58
    To be send to clients for every datachange notification from server
59
    """
60
    def __init__(self, subscription_data, monitored_item):
61
        self.monitored_item = monitored_item
62
        self.subscription_data = subscription_data
63
64
    def __str__(self):
65
        return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
66
    __repr__ = __str__
67
68
69
class Subscription:
70
    """
71
    Subscription object returned by Server or Client objects.
72
    The object represent a subscription to an opc-ua server.
73
    This is a high level class, especially subscribe_data_change
74
    and subscribe_events methods. If more control is necessary look at
75
    code and/or use create_monitored_items method.
76
    :param server: `InternalSession` or `UAClient`
77
    """
78
79
    def __init__(self, server, params, handler):
80
        self.logger = logging.getLogger(__name__)
81
        self.server = server
82
        self._client_handle = 200
83
        self._handler = handler
84
        self.parameters = params  # move to data class
85
        self._monitored_items = {}
86
        self.subscription_id = None
87
88
    async def init(self):
89
        response = await self.server.create_subscription(self.parameters, self.publish_callback)
90
        self.subscription_id = response.SubscriptionId  # move to data class
91
        self.logger.info('Subscription created %s', self.subscription_id)
92
        # Send a publish request so the server has one in its queue
93
        # Servers should always be able to handle at least on extra publish request per subscriptions
94
        await self.server.publish()
95
96
    async def delete(self):
97
        """
98
        Delete subscription on server. This is automatically done by Client and Server classes on exit
99
        """
100
        results = await self.server.delete_subscriptions([self.subscription_id])
101
        results[0].check()
102
103
    async def publish_callback(self, publishresult: ua.PublishResult):
104
        self.logger.info("Publish callback called with result: %s", publishresult)
105
        while self.subscription_id is None:
106
            await asyncio.sleep(0.01)
107
108
        if publishresult.NotificationMessage.NotificationData is not None:
109
            for notif in publishresult.NotificationMessage.NotificationData:
110
                if isinstance(notif, ua.DataChangeNotification):
111
                    self._call_datachange(notif)
112
                elif isinstance(notif, ua.EventNotificationList):
113
                    self._call_event(notif)
114
                elif isinstance(notif, ua.StatusChangeNotification):
115
                    self._call_status(notif)
116
                else:
117
                    self.logger.warning("Notification type not supported yet for notification %s", notif)
118
        else:
119
            self.logger.warning("NotificationMessage is None.")
120
121
        ack = ua.SubscriptionAcknowledgement()
122
        ack.SubscriptionId = self.subscription_id
123
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
124
        await self.server.publish([ack])
125
126
    def _call_datachange(self, datachange: ua.DataChangeNotification):
127
        for item in datachange.MonitoredItems:
128
            if item.ClientHandle not in self._monitored_items:
129
                self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
130
                continue
131
            data = self._monitored_items[item.ClientHandle]
132
            if hasattr(self._handler, "datachange_notification"):
133
                event_data = DataChangeNotif(data, item)
134
                try:
135
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
136
                except Exception:
137
                    self.logger.exception("Exception calling data change handler")
138
            elif hasattr(self._handler, "data_change"):  # deprecated API
139
                self.logger.warning("data_change method is deprecated, use datachange_notification")
140
                try:
141
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
142
                except Exception:
143
                    self.logger.exception("Exception calling deprecated data change handler")
144
            else:
145
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
146
147
    def _call_event(self, eventlist: ua.EventNotificationList):
148
        for event in eventlist.Events:
149
            data = self._monitored_items[event.ClientHandle]
150
            result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
151
            result.server_handle = data.server_handle
152
            if hasattr(self._handler, "event_notification"):
153
                try:
154
                    self._handler.event_notification(result)
155
                except Exception:
156
                    self.logger.exception("Exception calling event handler")
157
            elif hasattr(self._handler, "event"):  # depcrecated API
158
                try:
159
                    self._handler.event(data.server_handle, result)
160
                except Exception:
161
                    self.logger.exception("Exception calling deprecated event handler")
162
            else:
163
                self.logger.error("Event subscription created but handler has no event_notification method")
164
165
    def _call_status(self, status: ua.StatusChangeNotification):
166
        try:
167
            self._handler.status_change_notification(status.Status)
168
        except Exception:
169
            self.logger.exception("Exception calling status change handler")
170
171
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value, queuesize=0):
172
        """
173
        COROUTINE
174
        Subscribe for data change events for a node or list of nodes.
175
        default attribute is Value.
176
        Return a handle which can be used to unsubscribe
177
        If more control is necessary use create_monitored_items method
178
        """
179
        return self._subscribe(nodes, attr, queuesize=queuesize)
180
181
    async def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None,
182
            queuesize=0):
183
        """
184
        Subscribe to events from a node. Default node is Server node.
185
        In most servers the server node is the only one you can subscribe to.
186
        if evtypes is not provided, evtype defaults to BaseEventType
187
        if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
188
        Return a handle which can be used to unsubscribe
189
        """
190
        sourcenode = Node(self.server, sourcenode)
191
        if evfilter is None:
192
            if not type(evtypes) in (list, tuple):
193
                evtypes = [evtypes]
194
            evtypes = [Node(self.server, evtype) for evtype in evtypes]
195
            evfilter = await get_filter_from_event_type(evtypes)
196
        return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
197
198
    async def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
199
        is_list = True
200
        if isinstance(nodes, collections.Iterable):
201
            nodes = list(nodes)
202
        else:
203
            nodes = [nodes]
204
            is_list = False
205
        mirs = []
206
        for node in nodes:
207
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
208
            mirs.append(mir)
209
        mids = await self.create_monitored_items(mirs)
210
        if is_list:
211
            return mids
212
        if type(mids[0]) == ua.StatusCode:
213
            mids[0].check()
214
        return mids[0]
215
216
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
217
        rv = ua.ReadValueId()
218
        rv.NodeId = node.nodeid
219
        rv.AttributeId = attr
220
        # rv.IndexRange //We leave it null, then the entire array is returned
221
        mparams = ua.MonitoringParameters()
222
        self._client_handle += 1
223
        mparams.ClientHandle = self._client_handle
224
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
225
        mparams.QueueSize = queuesize
226
        mparams.DiscardOldest = True
227
        if mfilter:
228
            mparams.Filter = mfilter
229
        mir = ua.MonitoredItemCreateRequest()
230
        mir.ItemToMonitor = rv
231
        mir.MonitoringMode = ua.MonitoringMode.Reporting
232
        mir.RequestedParameters = mparams
233
        return mir
234
235
    async def create_monitored_items(self, monitored_items):
236
        """
237
        low level method to have full control over subscription parameters
238
        Client handle must be unique since it will be used as key for internal registration of data
239
        """
240
        params = ua.CreateMonitoredItemsParameters()
241
        params.SubscriptionId = self.subscription_id
242
        params.ItemsToCreate = monitored_items
243
        params.TimestampsToReturn = ua.TimestampsToReturn.Both
244
        # insert monitored item into map to avoid notification arrive before result return
245
        # server_handle is left as None in purpose as we don't get it yet.
246
        for mi in monitored_items:
247
            data = SubscriptionItemData()
248
            data.client_handle = mi.RequestedParameters.ClientHandle
249
            data.node = Node(self.server, mi.ItemToMonitor.NodeId)
250
            data.attribute = mi.ItemToMonitor.AttributeId
251
            #TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
252
            data.mfilter = mi.RequestedParameters.Filter
253
            self._monitored_items[mi.RequestedParameters.ClientHandle] = data
254
        results = await self.server.create_monitored_items(params)
255
        mids = []
256
        # process result, add server_handle, or remove it if failed
257
        for idx, result in enumerate(results):
258
            mi = params.ItemsToCreate[idx]
259
            if not result.StatusCode.is_good():
260
                del self._monitored_items[mi.RequestedParameters.ClientHandle]
261
                mids.append(result.StatusCode)
262
                continue
263
            data = self._monitored_items[mi.RequestedParameters.ClientHandle]
264
            data.server_handle = result.MonitoredItemId
265
            mids.append(result.MonitoredItemId)
266
        return mids
267
268
    async def unsubscribe(self, handle):
269
        """
270
        unsubscribe to datachange or events using the handle returned while subscribing
271
        if you delete subscription, you do not need to unsubscribe
272
        """
273
        params = ua.DeleteMonitoredItemsParameters()
274
        params.SubscriptionId = self.subscription_id
275
        params.MonitoredItemIds = [handle]
276
        results = await self.server.delete_monitored_items(params)
277
        results[0].check()
278
        for k, v in self._monitored_items.items():
279
            if v.server_handle == handle:
280
                del(self._monitored_items[k])
281
                return
282
283
    async def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
284
        """
285
        Modify a monitored item.
286
        :param handle: Handle returned when originally subscribing
287
        :param new_samp_time: New wanted sample time
288
        :param new_queuesize: New wanted queuesize, default is 0
289
        :param mod_filter_val: New deadband filter value
290
        :return: Return a Modify Monitored Item Result
291
        """
292
        # Find the monitored item in the monitored item registry.
293
        item_to_change = next(item for item in self._monitored_items.values() if item.server_handle == handle)
294
        if not item_to_change:
295
            raise ValueError('The monitored item was not found.')
296
        if mod_filter_val is None:
297
            mod_filter = None
298
        elif mod_filter_val < 0:
299
            mod_filter = item_to_change.mfilter
300
        else:
301
            mod_filter = ua.DataChangeFilter()
302
            mod_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
303
            mod_filter.DeadbandType = 1
304
            mod_filter.DeadbandValue = mod_filter_val  # absolute float value or from 0 to 100 for percentage deadband
305
        modif_item = ua.MonitoredItemModifyRequest()
306
        modif_item.MonitoredItemId = handle
307
        modif_item.RequestedParameters = self._modify_monitored_item_request(
308
            new_queuesize, new_samp_time, mod_filter, item_to_change.client_handle
309
        )
310
        params = ua.ModifyMonitoredItemsParameters()
311
        params.SubscriptionId = self.subscription_id
312
        params.ItemsToModify.append(modif_item)
313
        results = await self.server.modify_monitored_items(params)
314
        item_to_change.mfilter = results[0].FilterResult
315
        return results
316
317
    def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
318
        req_params = ua.MonitoringParameters()
319
        req_params.ClientHandle = client_handle
320
        req_params.QueueSize = new_queuesize
321
        req_params.Filter = mod_filter
322
        req_params.SamplingInterval = new_samp_time
323
        return req_params
324
325
    def deadband_monitor(self, var, deadband_val, deadbandtype=1, queuesize=0, attr=ua.AttributeIds.Value):
326
        """
327
        Method to create a subscription with a Deadband Value.
328
        Default deadband value type is absolute.
329
        Return a handle which can be used to unsubscribe
330
        :param var: Variable to which you want to subscribe
331
        :param deadband_val: Absolute float value
332
        :param deadbandtype: Default value is 1 (absolute), change to 2 for percentage deadband
333
        :param queuesize: Wanted queue size, default is 1
334
        :param attr: Attribute ID
335
        """
336
        deadband_filter = ua.DataChangeFilter()
337
        deadband_filter.Trigger = ua.DataChangeTrigger(1)  # send notification when status or value change
338
        deadband_filter.DeadbandType = deadbandtype
339
        deadband_filter.DeadbandValue = deadband_val  # absolute float value or from 0 to 100 for percentage deadband
340
        return self._subscribe(var, attr, deadband_filter, queuesize)
341