Completed
Pull Request — master (#180)
by
unknown
03:21
created

Subscription   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 228
Duplicated Lines 1.75 %

Test Coverage

Coverage 86.05%

Importance

Changes 4
Bugs 0 Features 0
Metric Value
c 4
b 0
f 0
dl 4
loc 228
ccs 148
cts 172
cp 0.8605
rs 7.9487
wmc 52

14 Methods

Rating   Name   Duplication   Size   Complexity  
B create_monitored_items() 0 34 6
B publish_callback() 0 19 6
A unsubscribe() 4 15 4
A delete() 0 6 1
D _call_event() 0 23 9
A _make_monitored_item_request() 0 19 3
A _get_filter_from_event_type() 0 10 2
A subscribe_events() 0 9 1
C _call_datachange() 0 21 8
A subscribe_data_change() 0 8 1
A _call_status() 0 5 2
A _get_node() 0 8 3
B _subscribe() 0 16 5
A __init__() 0 13 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
11
12 1
class SubHandler(object):
13
    """
14
    Subscription Handler. To receive events from server for a subscription
15
    This class is just a sample class. Whatever class having these methods can be used
16
    """
17
18 1
    def data_change(self, handle, node, val, attr):
19
        """
20
        Deprecated, use datachange_notification
21
        """
22
        pass
23
24 1
    def datachange_notification(self, node, val, data):
25
        """
26
        called for every datachange notification from server
27
        """
28
        pass
29
30 1
    def event_notification(self, event):
31
        """
32
        called for every event notification from server
33
        """
34
        pass
35
36 1
    def status_change_notification(self, status):
37
        """
38
        called for every status change notification from server
39
        """
40
        pass
41
42
43 1
class EventResult(object):
44
    """
45
    To be sent to clients for every events from server
46
    """
47
48 1
    def __init__(self):
49 1
        self.server_handle = None
50
51 1
    def __str__(self):
52
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
53 1
    __repr__ = __str__
54
55 1
    def get_event_props_as_fields_dict(self):
56
        """
57
        convert all properties of the EventResult class to a dict of variants
58
        """
59 1
        field_vars = {}
60 1
        for key, value in vars(self).items():
61 1
            if not key.startswith("__") and key is not "server_handle":
62 1
                field_vars[key] = ua.Variant(value)
63 1
        return field_vars
64
65
66 1
class SubscriptionItemData(object):
67
    """
68
    To store useful data from a monitored item
69
    """
70 1
    def __init__(self):
71 1
        self.node = None
72 1
        self.client_handle = None
73 1
        self.server_handle = None
74 1
        self.attribute = None
75 1
        self.mfilter = None
76
77
78 1
class DataChangeNotif(object):
79
    """
80
    To be send to clients for every datachange notification from server
81
    """
82 1
    def __init__(self, subscription_data, monitored_item):
83 1
        self.monitored_item = monitored_item
84 1
        self.subscription_data = subscription_data
85
86 1
    def __str__(self):
87
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
88 1
    __repr__ = __str__
89
90
91 1
class Subscription(object):
92
    """
93
    Subscription object returned by Server or Client objects.
94
    The object represent a subscription to an opc-ua server.
95
    This is a high level class, especially subscribe_data_change
96
    and subscribe_events methods. If more control is necessary look at
97
    code and/or use create_monitored_items method.
98
    """
99
100 1
    def __init__(self, server, params, handler):
101 1
        self.logger = logging.getLogger(__name__)
102 1
        self.server = server
103 1
        self._client_handle = 200
104 1
        self._handler = handler
105 1
        self.parameters = params  # move to data class
106 1
        self._monitoreditems_map = {}
107 1
        self._lock = Lock()
108 1
        self.subscription_id = None
109 1
        response = self.server.create_subscription(params, self.publish_callback)
110 1
        self.subscription_id = response.SubscriptionId  # move to data class
111 1
        self.server.publish()
112 1
        self.server.publish()
113
114 1
    def delete(self):
115
        """
116
        Delete subscription on server. This is automatically done by Client and Server classes on exit
117
        """
118 1
        results = self.server.delete_subscriptions([self.subscription_id])
119 1
        results[0].check()
120
121 1
    def publish_callback(self, publishresult):
122 1
        self.logger.info("Publish callback called with result: %s", publishresult)
123 1
        while self.subscription_id is None:
124 1
            time.sleep(0.01)
125
126 1
        for notif in publishresult.NotificationMessage.NotificationData:
127 1
            if isinstance(notif, ua.DataChangeNotification):
128 1
                self._call_datachange(notif)
129 1
            elif isinstance(notif, ua.EventNotificationList):
130 1
                self._call_event(notif)
131
            elif isinstance(notif, ua.StatusChangeNotification):
132
                self._call_status(notif)
133
            else:
134
                self.logger.warning("Notification type not supported yet for notification %s", notif)
135
136 1
        ack = ua.SubscriptionAcknowledgement()
137 1
        ack.SubscriptionId = self.subscription_id
138 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
139 1
        self.server.publish([ack])
140
141 1
    def _call_datachange(self, datachange):
142 1
        for item in datachange.MonitoredItems:
143 1
            with self._lock:
144 1
                if item.ClientHandle not in self._monitoreditems_map:
145
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
146
                    continue
147 1
                data = self._monitoreditems_map[item.ClientHandle]
148 1
            if hasattr(self._handler, "datachange_notification"):
149 1
                event_data = DataChangeNotif(data, item)
150 1
                try:
151 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
152
                except Exception:
153
                    self.logger.exception("Exception calling data change handler")
154 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
155 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
156 1
                try:
157 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
158
                except Exception:
159
                    self.logger.exception("Exception calling deprecated data change handler")
160
            else:
161
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
162
163 1
    def _call_event(self, eventlist):
164 1
        for event in eventlist.Events:
165 1
            with self._lock:
166 1
                data = self._monitoreditems_map[event.ClientHandle]
167 1
            result = EventResult()
168 1
            result.server_handle = data.server_handle
169 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
170 1
                if len(sattr.BrowsePath) == 0:
171
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
172
                else:
173 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
174 1
            if hasattr(self._handler, "event_notification"):
175 1
                try:
176 1
                    self._handler.event_notification(result)
177
                except Exception:
178
                    self.logger.exception("Exception calling event handler")
179
            elif hasattr(self._handler, "event"):  # depcrecated API
180
                try:
181
                    self._handler.event(data.server_handle, result)
182
                except Exception:
183
                    self.logger.exception("Exception calling deprecated event handler")
184
            else:
185
                self.logger.error("Event subscription created but handler has no event_notification method")
186
187 1
    def _call_status(self, status):
188
        try:
189
            self._handler.status_change_notification(status.Status)
190
        except Exception:
191
            self.logger.exception("Exception calling status change handler")
192
193 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
194
        """
195
        Subscribe for data change events for a node or list of nodes.
196
        default attribute is Value.
197
        Return a handle which can be used to unsubscribe
198
        If more control is necessary use create_monitored_items method
199
        """
200 1
        return self._subscribe(nodes, attr, queuesize=0)
201
202 1
    def _get_node(self, nodeid):
203 1
        if isinstance(nodeid, ua.NodeId):
204
            node = Node(self.server, nodeid)
205 1
        elif isinstance(nodeid, Node):
206 1
            node = nodeid
207
        else:
208 1
            node = Node(self.server, ua.NodeId(nodeid))
209 1
        return node
210
211 1
    def _get_filter_from_event_type(self, eventtype):
212 1
        eventtype = self._get_node(eventtype)
213 1
        evfilter = ua.EventFilter()
214 1
        for property in get_event_properties_from_type_node(eventtype):
215 1
            op = ua.SimpleAttributeOperand()
216 1
            op.TypeDefinitionId = eventtype.nodeid
217 1
            op.AttributeId = ua.AttributeIds.Value
218 1
            op.BrowsePath = [property.get_browse_name()]
219 1
            evfilter.SelectClauses.append(op)
220 1
        return evfilter
221
222 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType):
223
        """
224
        Subscribe to events from a node. Default node is Server node.
225
        In most servers the server node is the only one you can subscribe to.
226
        Return a handle which can be used to unsubscribe
227
        """
228 1
        sourcenode = self._get_node(sourcenode)
229 1
        evfilter = self._get_filter_from_event_type(evtype)
230 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
231
232 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
233 1
        is_list = True
234 1
        if not type(nodes) in (list, tuple):
235 1
            is_list = False
236 1
            nodes = [nodes]
237 1
        mirs = []
238 1
        for node in nodes:
239 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
240 1
            mirs.append(mir)
241
242 1
        mids = self.create_monitored_items(mirs)
243 1
        if is_list:
244 1
            return mids
245 1
        if type(mids[0]) == ua.StatusCode:
246 1
            mids[0].check()
247 1
        return mids[0]
248
249 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
250 1
        rv = ua.ReadValueId()
251 1
        rv.NodeId = node.nodeid
252 1
        rv.AttributeId = attr
253
        # rv.IndexRange //We leave it null, then the entire array is returned
254 1
        mparams = ua.MonitoringParameters()
255 1
        with self._lock:
256 1
            self._client_handle += 1
257 1
            mparams.ClientHandle = self._client_handle
258 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
259 1
        mparams.QueueSize = queuesize
260 1
        mparams.DiscardOldest = True
261 1
        if mfilter:
262 1
            mparams.Filter = mfilter
263 1
        mir = ua.MonitoredItemCreateRequest()
264 1
        mir.ItemToMonitor = rv
265 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
266 1
        mir.RequestedParameters = mparams
267 1
        return mir
268
269 1
    def create_monitored_items(self, monitored_items):
270
        """
271
        low level method to have full control over subscription parameters
272
        Client handle must be unique since it will be used as key for internal registration of data
273
        """
274 1
        params = ua.CreateMonitoredItemsParameters()
275 1
        params.SubscriptionId = self.subscription_id
276 1
        params.ItemsToCreate = monitored_items
277 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
278
279
        # insert monitored item into map to avoid notification arrive before result return
280
        # server_handle is left as None in purpose as we don't get it yet.
281 1
        with self._lock:
282 1
            for mi in monitored_items:
283 1
                data = SubscriptionItemData()
284 1
                data.client_handle = mi.RequestedParameters.ClientHandle
285 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
286 1
                data.attribute = mi.ItemToMonitor.AttributeId
287 1
                data.mfilter = mi.RequestedParameters.Filter
288 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
289 1
        results = self.server.create_monitored_items(params)
290 1
        mids = []
291
        # process result, add server_handle, or remove it if failed
292 1
        with self._lock:
293 1
            for idx, result in enumerate(results):
294 1
                mi = params.ItemsToCreate[idx]
295 1
                if not result.StatusCode.is_good():
296 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
297 1
                    mids.append(result.StatusCode)
298 1
                    continue
299 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
300 1
                data.server_handle = result.MonitoredItemId
301 1
                mids.append(result.MonitoredItemId)
302 1
        return mids
303
304 1
    def unsubscribe(self, handle):
305
        """
306
        unsubscribe to datachange or events using the handle returned while subscribing
307
        if you delete subscription, you do not need to unsubscribe
308
        """
309 1
        params = ua.DeleteMonitoredItemsParameters()
310 1
        params.SubscriptionId = self.subscription_id
311 1
        params.MonitoredItemIds = [handle]
312 1
        results = self.server.delete_monitored_items(params)
313 1
        results[0].check()
314 1
        with self._lock:
315 1 View Code Duplication
            for k, v in self._monitoreditems_map.items():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
316 1
                if v.server_handle == handle:
317 1
                    del(self._monitoreditems_map[k])
318 1
                    return
319
320
321 1
def get_event_properties_from_type_node(node):
322 1
    properties = []
323 1
    curr_node = node
324
325 1
    while True:
326 1
        properties.extend(curr_node.get_properties())
327
328 1
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
329 1
            break
330
331 1
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
332 1
        if len(parents) != 1:  # Something went wrong
333
            return None
334 1
        curr_node = parents[0]
335
336
    return properties
337