Completed
Pull Request — master (#179)
by Olivier
04:25
created

Subscription   D

Complexity

Total Complexity 60

Size/Duplication

Total Lines 260
Duplicated Lines 3.85 %

Test Coverage

Coverage 88.38%

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 10
loc 260
ccs 175
cts 198
cp 0.8838
rs 4.2857
wmc 60

17 Methods

Rating   Name   Duplication   Size   Complexity  
A delete() 0 6 1
A subscribe_data_change() 0 8 1
A _call_status() 0 5 2
A _get_node() 0 8 3
A __init__() 0 13 1
C _call_datachange() 0 21 8
D _call_event() 0 23 9
B publish_callback() 0 19 6
A _get_filter_from_event_type() 0 6 1
A _select_clauses_from_evtype() 0 9 2
A _where_clause_from_evtype() 0 15 3
A _make_monitored_item_request() 0 19 3
A subscribe_events() 0 11 2
B create_monitored_items() 10 34 6
A unsubscribe() 0 15 4
A _get_subtypes() 0 7 3
B _subscribe() 0 16 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
11
12 1
class SubHandler(object):
13
    """
14
    Subscription Handler. To receive events from server for a subscription
15
    This class is just a sample class. Whatever class having these methods can be used
16
    """
17
18 1
    def data_change(self, handle, node, val, attr):
19
        """
20
        Deprecated, use datachange_notification
21
        """
22
        pass
23
24 1
    def datachange_notification(self, node, val, data):
25
        """
26
        called for every datachange notfication from server
27
        """
28
        pass
29
30 1
    def event_notification(self, event):
31
        """
32
        called for every event notfication from server
33
        """
34
        pass
35
36 1
    def status_change_notification(self, status):
37
        """
38
        called for every status change notfication from server
39
        """
40
        pass
41
42
43 1
class EventResult(object):
44
    """
45
    To be sent to clients for every events from server
46
    """
47
48 1
    def __init__(self):
49 1
        self.server_handle = None
50
51 1
    def __str__(self):
52
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
53 1
    __repr__ = __str__
54
55
56 1
class SubscriptionItemData(object):
57
    """
58
    To store usefull data from a monitored item
59
    """
60 1
    def __init__(self):
61 1
        self.node = None
62 1
        self.client_handle = None
63 1
        self.server_handle = None
64 1
        self.attribute = None
65 1
        self.mfilter = None
66
67
68 1
class DataChangeNotif(object):
69
    """
70
    To be send to clients for every datachange notification from server
71
    """
72 1
    def __init__(self, subscription_data, monitored_item):
73 1
        self.monitored_item = monitored_item
74 1
        self.subscription_data = subscription_data
75
76 1
    def __str__(self):
77
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
78 1
    __repr__ = __str__
79
80
81 1
class Subscription(object):
82
    """
83
    Subscription object returned by Server or Client objects.
84
    The object represent a subscription to an opc-ua server.
85
    This is a high level class, especially subscribe_data_change
86
    and subscribe_events methods. If more control is necessary look at
87
    code and/or use create_monitored_items method.
88
    """
89
90 1
    def __init__(self, server, params, handler):
91 1
        self.logger = logging.getLogger(__name__)
92 1
        self.server = server
93 1
        self._client_handle = 200
94 1
        self._handler = handler
95 1
        self.parameters = params  # move to data class
96 1
        self._monitoreditems_map = {}
97 1
        self._lock = Lock()
98 1
        self.subscription_id = None
99 1
        response = self.server.create_subscription(params, self.publish_callback)
100 1
        self.subscription_id = response.SubscriptionId  # move to data class
101 1
        self.server.publish()
102 1
        self.server.publish()
103
104 1
    def delete(self):
105
        """
106
        Delete subscription on server. This is automatically done by Client and Server classes on exit
107
        """
108 1
        results = self.server.delete_subscriptions([self.subscription_id])
109 1
        results[0].check()
110
111 1
    def publish_callback(self, publishresult):
112 1
        self.logger.info("Publish callback called with result: %s", publishresult)
113 1
        while self.subscription_id is None:
114
            time.sleep(0.01)
115
116 1
        for notif in publishresult.NotificationMessage.NotificationData:
117 1
            if isinstance(notif, ua.DataChangeNotification):
118 1
                self._call_datachange(notif)
119 1
            elif isinstance(notif, ua.EventNotificationList):
120 1
                self._call_event(notif)
121
            elif isinstance(notif, ua.StatusChangeNotification):
122
                self._call_status(notif)
123
            else:
124
                self.logger.warning("Notification type not supported yet for notification %s", notif)
125
126 1
        ack = ua.SubscriptionAcknowledgement()
127 1
        ack.SubscriptionId = self.subscription_id
128 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
129 1
        self.server.publish([ack])
130
131 1
    def _call_datachange(self, datachange):
132 1
        for item in datachange.MonitoredItems:
133 1
            with self._lock:
134 1
                if item.ClientHandle not in self._monitoreditems_map:
135 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
136 1
                    continue
137 1
                data = self._monitoreditems_map[item.ClientHandle]
138 1
            if hasattr(self._handler, "datachange_notification"):
139 1
                event_data = DataChangeNotif(data, item)
140 1
                try:
141 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
142
                except Exception:
143
                    self.logger.exception("Exception calling data change handler")
144 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
145 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
146 1
                try:
147 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
148
                except Exception:
149
                    self.logger.exception("Exception calling deprecated data change handler")
150
            else:
151
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
152
153 1
    def _call_event(self, eventlist):
154 1
        for event in eventlist.Events:
155 1
            with self._lock:
156 1
                data = self._monitoreditems_map[event.ClientHandle]
157 1
            result = EventResult()
158 1
            result.server_handle = data.server_handle
159 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
160 1
                if len(sattr.BrowsePath) == 0:
161
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
162
                else:
163 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
164 1
            if hasattr(self._handler, "event_notification"):
165 1
                try:
166 1
                    self._handler.event_notification(result)
167
                except Exception:
168
                    self.logger.exception("Exception calling event handler")
169
            elif hasattr(self._handler, "event"):  # depcrecated API
170
                try:
171
                    self._handler.event(data.server_handle, result)
172
                except Exception:
173
                    self.logger.exception("Exception calling deprecated event handler")
174
            else:
175
                self.logger.error("Event subscription created but handler has no event_notification method")
176
177 1
    def _call_status(self, status):
178
        try:
179
            self._handler.status_change_notification(status.Status)
180
        except Exception:
181
            self.logger.exception("Exception calling status change handler")
182
183 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
184
        """
185
        Subscribe for data change events for a node or list of nodes.
186
        default attribute is Value.
187
        Return a handle which can be used to unsubscribe
188
        If more control is necessary use create_monitored_items method
189
        """
190 1
        return self._subscribe(nodes, attr, queuesize=0)
191
192 1
    def _get_node(self, nodeid):
193 1
        if isinstance(nodeid, ua.NodeId):
194
            node = Node(self.server, nodeid)
195 1
        elif isinstance(nodeid, Node):
196 1
            node = nodeid
197
        else:
198 1
            node = Node(self.server, ua.NodeId(nodeid))
199 1
        return node
200
201 1
    def _get_filter_from_event_type(self, eventtype):
202 1
        eventtype = self._get_node(eventtype)
203 1
        evfilter = ua.EventFilter()
204 1
        evfilter.SelectClauses = self._select_clauses_from_evtype(eventtype)
205 1
        evfilter.WhereClause = self._where_clause_from_evtype(eventtype)
206 1
        return evfilter
207
208 1
    def _select_clauses_from_evtype(self, evtype):
209 1
        clauses = []
210 1
        for property in get_event_properties_from_type_node(evtype):
211 1
            op = ua.SimpleAttributeOperand()
212 1
            op.TypeDefinitionId = evtype.nodeid
213 1
            op.AttributeId = ua.AttributeIds.Value
214 1
            op.BrowsePath = [property.get_browse_name()]
215 1
            clauses.append(op)
216 1
        return clauses
217
218 1
    def _where_clause_from_evtype(self, evtype):
219 1
        cf = ua.ContentFilter()
220 1
        el = ua.ContentFilterElement()
221
        # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
222 1
        op = ua.SimpleAttributeOperand()
223 1
        op.AttributeId = ua.AttributeIds.NodeId
224 1
        el.FilterOperands.append(op)
225 1
        for subtypeid in [st.nodeid for st in self._get_subtypes(evtype)]:
226 1
            op = ua.LiteralOperand()
227 1
            op.Value = subtypeid
228 1
            el.FilterOperands.append(op)
229 1
        el.FilterOperator = ua.FilterOperator.InList
230
231 1
        cf.Elements.append(el)
232 1
        return cf
233
234 1
    def _get_subtypes(self, parent, nodes=None):
235 1
        if nodes is None:
236 1
            nodes = [parent]
237 1
        for child in parent.get_children(refs=ua.ObjectIds.HasSubtype):
238 1
            nodes.append(child)
239 1
            self._get_subtypes(child, nodes)
240 1
        return nodes
241
242 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
243
        """
244
        Subscribe to events from a node. Default node is Server node.
245
        In most servers the server node is the only one you can subscribe to.
246
        if evfilter is provided, evtype is ignored
247
        Return a handle which can be used to unsubscribe
248
        """
249 1
        sourcenode = self._get_node(sourcenode)
250 1
        if evfilter is None:
251 1
            evfilter = self._get_filter_from_event_type(evtype)
252 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
253
254 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
255 1
        is_list = True
256 1
        if not type(nodes) in (list, tuple):
257 1
            is_list = False
258 1
            nodes = [nodes]
259 1
        mirs = []
260 1
        for node in nodes:
261 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
262 1
            mirs.append(mir)
263
264 1
        mids = self.create_monitored_items(mirs)
265 1
        if is_list:
266 1
            return mids
267 1
        if type(mids[0]) == ua.StatusCode:
268 1
            mids[0].check()
269 1
        return mids[0]
270
271 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
272 1
        rv = ua.ReadValueId()
273 1
        rv.NodeId = node.nodeid
274 1
        rv.AttributeId = attr
275
        # rv.IndexRange //We leave it null, then the entire array is returned
276 1
        mparams = ua.MonitoringParameters()
277 1
        with self._lock:
278 1
            self._client_handle += 1
279 1
            mparams.ClientHandle = self._client_handle
280 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
281 1
        mparams.QueueSize = queuesize
282 1
        mparams.DiscardOldest = True
283 1
        if mfilter:
284 1
            mparams.Filter = mfilter
285 1
        mir = ua.MonitoredItemCreateRequest()
286 1
        mir.ItemToMonitor = rv
287 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
288 1
        mir.RequestedParameters = mparams
289 1
        return mir
290
291 1
    def create_monitored_items(self, monitored_items):
292
        """
293
        low level method to have full control over subscription parameters
294
        Client handle must be unique since it will be used as key for internal registration of data
295
        """
296 1
        params = ua.CreateMonitoredItemsParameters()
297 1
        params.SubscriptionId = self.subscription_id
298 1
        params.ItemsToCreate = monitored_items
299 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
300
301
        # insert monitoried item into map to avoid notification arrive before result return
302
        # server_handle is left as None in purpose as we don't get it yet.
303 1
        with self._lock:
304 1
            for mi in monitored_items:
305 1
                data = SubscriptionItemData()
306 1
                data.client_handle = mi.RequestedParameters.ClientHandle
307 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
308 1
                data.attribute = mi.ItemToMonitor.AttributeId
309 1
                data.mfilter = mi.RequestedParameters.Filter
310 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
311 1
        results = self.server.create_monitored_items(params)
312 1
        mids = []
313
        # process result, add server_handle, or remove it if failed
314 1
        with self._lock:
315 1 View Code Duplication
            for idx, result in enumerate(results):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
316 1
                mi = params.ItemsToCreate[idx]
317 1
                if not result.StatusCode.is_good():
318 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
319 1
                    mids.append(result.StatusCode)
320 1
                    continue
321 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
322 1
                data.server_handle = result.MonitoredItemId
323 1
                mids.append(result.MonitoredItemId)
324 1
        return mids
325
326 1
    def unsubscribe(self, handle):
327
        """
328
        unsubscribe to datachange or events using the handle returned while subscribing
329
        if you delete subscription, you do not need to unsubscribe
330
        """
331 1
        params = ua.DeleteMonitoredItemsParameters()
332 1
        params.SubscriptionId = self.subscription_id
333 1
        params.MonitoredItemIds = [handle]
334 1
        results = self.server.delete_monitored_items(params)
335 1
        results[0].check()
336 1
        with self._lock:
337 1
            for k, v in self._monitoreditems_map.items():
338 1
                if v.server_handle == handle:
339 1
                    del(self._monitoreditems_map[k])
340 1
                    return
341
342
343 1
def get_event_properties_from_type_node(node):
344 1
    properties = []
345 1
    curr_node = node
346
347 1
    while True:
348 1
        properties.extend(curr_node.get_properties())
349
350 1
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
351 1
            break
352
353 1
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
354 1
        if len(parents) != 1:  # Something went wrong
355
            return None
356 1
        curr_node = parents[0]
357
358
    return properties
359