Completed
Pull Request — master (#179)
by Olivier
03:27
created

Subscription   D

Complexity

Total Complexity 60

Size/Duplication

Total Lines 260
Duplicated Lines 3.85 %

Test Coverage

Coverage 87.88%

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 10
loc 260
ccs 174
cts 198
cp 0.8788
rs 4.2857
wmc 60

17 Methods

Rating   Name   Duplication   Size   Complexity  
B publish_callback() 0 19 6
A delete() 0 6 1
D _call_event() 0 23 9
C _call_datachange() 0 21 8
A subscribe_data_change() 0 8 1
A _call_status() 0 5 2
A _get_node() 0 8 3
A __init__() 0 13 1
A _where_clause_from_evtype() 0 15 3
A _make_monitored_item_request() 0 19 3
A _get_filter_from_event_type() 0 6 1
A subscribe_events() 0 11 2
B create_monitored_items() 10 34 6
A unsubscribe() 0 15 4
A _get_subtypes() 0 7 3
A _select_clauses_from_evtype() 0 9 2
B _subscribe() 0 16 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
11
12 1
class SubHandler(object):
13
    """
14
    Subscription Handler. To receive events from server for a subscription
15
    This class is just a sample class. Whatever class having these methods can be used
16
    """
17
18 1
    def data_change(self, handle, node, val, attr):
19
        """
20
        Deprecated, use datachange_notification
21
        """
22
        pass
23
24 1
    def datachange_notification(self, node, val, data):
25
        """
26
        called for every datachange notification from server
27
        """
28
        pass
29
30 1
    def event_notification(self, event):
31
        """
32
        called for every event notification from server
33
        """
34
        pass
35
36 1
    def status_change_notification(self, status):
37
        """
38
        called for every status change notification from server
39
        """
40
        pass
41
42
43 1
class EventResult(object):
44
    """
45
    To be sent to clients for every events from server
46
    """
47
48 1
    def __init__(self):
49 1
        self.server_handle = None
50
51 1
    def __str__(self):
52
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
53 1
    __repr__ = __str__
54
55 1
    def get_event_props_as_fields_dict(self):
56
        """
57
        convert all properties of the EventResult class to a dict of variants
58
        """
59 1
        field_vars = {}
60 1
        for key, value in vars(self).items():
61 1
            if not key.startswith("__") and key is not "server_handle":
62 1
                field_vars[key] = ua.Variant(value)
63 1
        return field_vars
64
65
66 1
class SubscriptionItemData(object):
67
    """
68
    To store useful data from a monitored item
69
    """
70 1
    def __init__(self):
71 1
        self.node = None
72 1
        self.client_handle = None
73 1
        self.server_handle = None
74 1
        self.attribute = None
75 1
        self.mfilter = None
76
77
78 1
class DataChangeNotif(object):
79
    """
80
    To be send to clients for every datachange notification from server
81
    """
82 1
    def __init__(self, subscription_data, monitored_item):
83 1
        self.monitored_item = monitored_item
84 1
        self.subscription_data = subscription_data
85
86 1
    def __str__(self):
87
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
88 1
    __repr__ = __str__
89
90
91 1
class Subscription(object):
92
    """
93
    Subscription object returned by Server or Client objects.
94
    The object represent a subscription to an opc-ua server.
95
    This is a high level class, especially subscribe_data_change
96
    and subscribe_events methods. If more control is necessary look at
97
    code and/or use create_monitored_items method.
98
    """
99
100 1
    def __init__(self, server, params, handler):
101 1
        self.logger = logging.getLogger(__name__)
102 1
        self.server = server
103 1
        self._client_handle = 200
104 1
        self._handler = handler
105 1
        self.parameters = params  # move to data class
106 1
        self._monitoreditems_map = {}
107 1
        self._lock = Lock()
108 1
        self.subscription_id = None
109 1
        response = self.server.create_subscription(params, self.publish_callback)
110 1
        self.subscription_id = response.SubscriptionId  # move to data class
111 1
        self.server.publish()
112 1
        self.server.publish()
113
114 1
    def delete(self):
115
        """
116
        Delete subscription on server. This is automatically done by Client and Server classes on exit
117
        """
118 1
        results = self.server.delete_subscriptions([self.subscription_id])
119 1
        results[0].check()
120
121 1
    def publish_callback(self, publishresult):
122 1
        self.logger.info("Publish callback called with result: %s", publishresult)
123 1
        while self.subscription_id is None:
124 1
            time.sleep(0.01)
125
126 1
        for notif in publishresult.NotificationMessage.NotificationData:
127 1
            if isinstance(notif, ua.DataChangeNotification):
128 1
                self._call_datachange(notif)
129 1
            elif isinstance(notif, ua.EventNotificationList):
130 1
                self._call_event(notif)
131
            elif isinstance(notif, ua.StatusChangeNotification):
132
                self._call_status(notif)
133
            else:
134
                self.logger.warning("Notification type not supported yet for notification %s", notif)
135
136 1
        ack = ua.SubscriptionAcknowledgement()
137 1
        ack.SubscriptionId = self.subscription_id
138 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
139 1
        self.server.publish([ack])
140
141 1
    def _call_datachange(self, datachange):
142 1
        for item in datachange.MonitoredItems:
143 1
            with self._lock:
144 1
                if item.ClientHandle not in self._monitoreditems_map:
145
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
146
                    continue
147 1
                data = self._monitoreditems_map[item.ClientHandle]
148 1
            if hasattr(self._handler, "datachange_notification"):
149 1
                event_data = DataChangeNotif(data, item)
150 1
                try:
151 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
152
                except Exception:
153
                    self.logger.exception("Exception calling data change handler")
154 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
155 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
156 1
                try:
157 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
158
                except Exception:
159
                    self.logger.exception("Exception calling deprecated data change handler")
160
            else:
161
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
162
163 1
    def _call_event(self, eventlist):
164 1
        for event in eventlist.Events:
165 1
            with self._lock:
166 1
                data = self._monitoreditems_map[event.ClientHandle]
167 1
            result = EventResult()
168 1
            result.server_handle = data.server_handle
169 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
170 1
                if len(sattr.BrowsePath) == 0:
171
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
172
                else:
173 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
174 1
            if hasattr(self._handler, "event_notification"):
175 1
                try:
176 1
                    self._handler.event_notification(result)
177
                except Exception:
178
                    self.logger.exception("Exception calling event handler")
179
            elif hasattr(self._handler, "event"):  # depcrecated API
180
                try:
181
                    self._handler.event(data.server_handle, result)
182
                except Exception:
183
                    self.logger.exception("Exception calling deprecated event handler")
184
            else:
185
                self.logger.error("Event subscription created but handler has no event_notification method")
186
187 1
    def _call_status(self, status):
188
        try:
189
            self._handler.status_change_notification(status.Status)
190
        except Exception:
191
            self.logger.exception("Exception calling status change handler")
192
193 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
194
        """
195
        Subscribe for data change events for a node or list of nodes.
196
        default attribute is Value.
197
        Return a handle which can be used to unsubscribe
198
        If more control is necessary use create_monitored_items method
199
        """
200 1
        return self._subscribe(nodes, attr, queuesize=0)
201
202 1
    def _get_node(self, nodeid):
203 1
        if isinstance(nodeid, ua.NodeId):
204
            node = Node(self.server, nodeid)
205 1
        elif isinstance(nodeid, Node):
206 1
            node = nodeid
207
        else:
208 1
            node = Node(self.server, ua.NodeId(nodeid))
209 1
        return node
210
211 1
    def _get_filter_from_event_type(self, eventtype):
212 1
        eventtype = self._get_node(eventtype)
213 1
        evfilter = ua.EventFilter()
214 1
        evfilter.SelectClauses = self._select_clauses_from_evtype(eventtype)
215 1
        evfilter.WhereClause = self._where_clause_from_evtype(eventtype)
216 1
        return evfilter
217
218 1
    def _select_clauses_from_evtype(self, evtype):
219 1
        clauses = []
220 1
        for property in get_event_properties_from_type_node(evtype):
221 1
            op = ua.SimpleAttributeOperand()
222 1
            op.TypeDefinitionId = evtype.nodeid
223 1
            op.AttributeId = ua.AttributeIds.Value
224 1
            op.BrowsePath = [property.get_browse_name()]
225 1
            clauses.append(op)
226 1
        return clauses
227
228 1
    def _where_clause_from_evtype(self, evtype):
229 1
        cf = ua.ContentFilter()
230 1
        el = ua.ContentFilterElement()
231
        # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
232 1
        op = ua.SimpleAttributeOperand()
233 1
        op.AttributeId = ua.AttributeIds.NodeId
234 1
        el.FilterOperands.append(op)
235 1
        for subtypeid in [st.nodeid for st in self._get_subtypes(evtype)]:
236 1
            op = ua.LiteralOperand()
237 1
            op.Value = ua.Variant(subtypeid)
238 1
            el.FilterOperands.append(op)
239 1
        el.FilterOperator = ua.FilterOperator.InList
240
241 1
        cf.Elements.append(el)
242 1
        return cf
243
244 1
    def _get_subtypes(self, parent, nodes=None):
245 1
        if nodes is None:
246 1
            nodes = [parent]
247 1
        for child in parent.get_children(refs=ua.ObjectIds.HasSubtype):
248 1
            nodes.append(child)
249 1
            self._get_subtypes(child, nodes)
250 1
        return nodes
251
252 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
253
        """
254
        Subscribe to events from a node. Default node is Server node.
255
        In most servers the server node is the only one you can subscribe to.
256
        if evfilter is provided, evtype is ignored
257
        Return a handle which can be used to unsubscribe
258
        """
259 1
        sourcenode = self._get_node(sourcenode)
260 1
        if evfilter is None:
261 1
            evfilter = self._get_filter_from_event_type(evtype)
262 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
263
264 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
265 1
        is_list = True
266 1
        if not type(nodes) in (list, tuple):
267 1
            is_list = False
268 1
            nodes = [nodes]
269 1
        mirs = []
270 1
        for node in nodes:
271 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
272 1
            mirs.append(mir)
273
274 1
        mids = self.create_monitored_items(mirs)
275 1
        if is_list:
276 1
            return mids
277 1
        if type(mids[0]) == ua.StatusCode:
278 1
            mids[0].check()
279 1
        return mids[0]
280
281 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
282 1
        rv = ua.ReadValueId()
283 1
        rv.NodeId = node.nodeid
284 1
        rv.AttributeId = attr
285
        # rv.IndexRange //We leave it null, then the entire array is returned
286 1
        mparams = ua.MonitoringParameters()
287 1
        with self._lock:
288 1
            self._client_handle += 1
289 1
            mparams.ClientHandle = self._client_handle
290 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
291 1
        mparams.QueueSize = queuesize
292 1
        mparams.DiscardOldest = True
293 1
        if mfilter:
294 1
            mparams.Filter = mfilter
295 1
        mir = ua.MonitoredItemCreateRequest()
296 1
        mir.ItemToMonitor = rv
297 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
298 1
        mir.RequestedParameters = mparams
299 1
        return mir
300
301 1
    def create_monitored_items(self, monitored_items):
302
        """
303
        low level method to have full control over subscription parameters
304
        Client handle must be unique since it will be used as key for internal registration of data
305
        """
306 1
        params = ua.CreateMonitoredItemsParameters()
307 1
        params.SubscriptionId = self.subscription_id
308 1
        params.ItemsToCreate = monitored_items
309 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
310
311
        # insert monitored item into map to avoid notification arrive before result return
312
        # server_handle is left as None in purpose as we don't get it yet.
313 1
        with self._lock:
314 1
            for mi in monitored_items:
315 1 View Code Duplication
                data = SubscriptionItemData()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
316 1
                data.client_handle = mi.RequestedParameters.ClientHandle
317 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
318 1
                data.attribute = mi.ItemToMonitor.AttributeId
319 1
                data.mfilter = mi.RequestedParameters.Filter
320 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
321 1
        results = self.server.create_monitored_items(params)
322 1
        mids = []
323
        # process result, add server_handle, or remove it if failed
324 1
        with self._lock:
325 1
            for idx, result in enumerate(results):
326 1
                mi = params.ItemsToCreate[idx]
327 1
                if not result.StatusCode.is_good():
328 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
329 1
                    mids.append(result.StatusCode)
330 1
                    continue
331 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
332 1
                data.server_handle = result.MonitoredItemId
333 1
                mids.append(result.MonitoredItemId)
334 1
        return mids
335
336 1
    def unsubscribe(self, handle):
337
        """
338
        unsubscribe to datachange or events using the handle returned while subscribing
339
        if you delete subscription, you do not need to unsubscribe
340
        """
341 1
        params = ua.DeleteMonitoredItemsParameters()
342 1
        params.SubscriptionId = self.subscription_id
343 1
        params.MonitoredItemIds = [handle]
344 1
        results = self.server.delete_monitored_items(params)
345 1
        results[0].check()
346 1
        with self._lock:
347 1
            for k, v in self._monitoreditems_map.items():
348 1
                if v.server_handle == handle:
349 1
                    del(self._monitoreditems_map[k])
350 1
                    return
351
352
353 1
def get_event_properties_from_type_node(node):
354 1
    properties = []
355 1
    curr_node = node
356
357 1
    while True:
358 1
        properties.extend(curr_node.get_properties())
359
360 1
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
361 1
            break
362
363 1
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
364 1
        if len(parents) != 1:  # Something went wrong
365
            return None
366 1
        curr_node = parents[0]
367
368
    return properties
369