Passed
Branch dev (949ba8)
by Olivier
03:17 queued 01:08
created

opcua.Subscription   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 211
Duplicated Lines 0 %

Test Coverage

Coverage 90.07%
Metric Value
wmc 44
dl 0
loc 211
ccs 136
cts 151
cp 0.9007
rs 8.3396

14 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 13 1
A delete() 0 6 1
B create_monitored_items() 0 32 4
A _get_filter_from_event_type() 0 10 2
A _call_status() 0 5 2
A unsubscribe() 0 15 4
A _get_node() 0 8 3
A subscribe_data_change() 0 8 1
B _call_event() 0 18 6
B _subscribe() 0 16 5
A _make_monitored_item_request() 0 19 3
B _call_datachange() 0 11 5
A subscribe_events() 0 9 1
B publish_callback() 0 19 6

How to fix   Complexity   

Complex Class

Complex classes like opcua.Subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
high level interface to subscriptions
3
"""
4 1
import io
5 1
import time
6 1
import logging
7 1
from threading import Lock
8
9 1
from opcua import ua
10 1
from opcua import Node
11 1
from opcua import ObjectIds
12 1
from opcua import AttributeIds
13
#from opcua import Event
14
15
16 1
class EventResult():
17
18 1
    def __str__(self):
19
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
20 1
    __repr__ = __str__
21
22
23 1
class SubscriptionItemData():
24
25 1
    def __init__(self):
26 1
        self.node = None
27 1
        self.client_handle = None
28 1
        self.server_handle = None
29 1
        self.attribute = None
30 1
        self.mfilter = None
31
32
33 1
class Subscription(object):
34
    """
35
    Subscription object returned by Server or Client objects.
36
    The object represent a subscription to an opc-ua server.
37
    This is a high level class, especially subscribe_data_change 
38
    and subscribe_events methods. If more control is necessary look at
39
    code and/or use create_monitored_items method.
40
    """
41
42 1
    def __init__(self, server, params, handler):
43 1
        self.logger = logging.getLogger(__name__)
44 1
        self.server = server
45 1
        self._client_handle = 200
46 1
        self._handler = handler
47 1
        self.parameters = params  # move to data class
48 1
        self._monitoreditems_map = {}
49 1
        self._lock = Lock()
50 1
        self.subscription_id = None
51 1
        response = self.server.create_subscription(params, self.publish_callback)
52 1
        self.subscription_id = response.SubscriptionId  # move to data class
53 1
        self.server.publish()
54 1
        self.server.publish()
55
56 1
    def delete(self):
57
        """
58
        Delete subscription on server. This is automatically done by Client and Server classes on exit
59
        """
60 1
        results = self.server.delete_subscriptions([self.subscription_id])
61 1
        results[0].check()
62
63 1
    def publish_callback(self, publishresult):
64 1
        self.logger.info("Publish callback called with result: %s", publishresult)
65 1
        while self.subscription_id is None:
66
            time.sleep(0.01)
67
68 1
        for notif in publishresult.NotificationMessage.NotificationData:
69 1
            if isinstance(notif, ua.DataChangeNotification):
70 1
                self._call_datachange(notif)
71 1
            elif isinstance(notif, ua.EventNotificationList):
72 1
                self._call_event(notif)
73
            elif isinstance(notif, ua.StatusChangeNotification):
74
                self._call_status(notif)
75
            else:
76
                self.logger.warning("Notification type not supported yet for notification %s", notif)
77
78 1
        ack = ua.SubscriptionAcknowledgement()
79 1
        ack.SubscriptionId = self.subscription_id
80 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
81 1
        self.server.publish([ack])
82
83 1
    def _call_datachange(self, datachange):
84 1
        for item in datachange.MonitoredItems:
85 1
            with self._lock:
86 1
                if item.ClientHandle not in self._monitoreditems_map:
87 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
88 1
                    continue
89 1
                data = self._monitoreditems_map[item.ClientHandle]
90 1
            try:
91 1
                self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
92
            except Exception:
93
                self.logger.exception("Exception calling data change handler")
94
95 1
    def _call_event(self, eventlist):
96 1
        for event in eventlist.Events:
97 1
            with self._lock:
98 1
                data = self._monitoreditems_map[event.ClientHandle]
99 1
            try:
100
                #fields = {}
101 1
                result = EventResult()
102 1
                for idx, sattr in enumerate(data.mfilter.SelectClauses):
103
104 1
                    if len(sattr.BrowsePath) == 0:
105
                        #fields[ua.AttributeIdsInv[sattr.AttributeId]] = event.EventFields[idx].Value
106
                        setattr(result, ua.AttributeIdsInv[sattr.AttributeId], event.EventFields[idx].Value)
107
                    else:
108 1
                        setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
109
                #self._handler.event(data.server_handle, fields)
110 1
                self._handler.event(data.server_handle, result)
111
            except Exception:
112
                self.logger.exception("Exception calling event handler")
113
114 1
    def _call_status(self, status):
115
        try:
116
            self._handler.status_change(status.Status)
117
        except Exception:
118
            self.logger.exception("Exception calling status change handler")
119
120 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
121
        """
122
        Subscribe for data change events for a node or list of nodes.
123
        default attribute is Value.
124
        Return a handle which can be used to unsubscribe
125
        If more control is necessary use create_monitored_items method
126
        """
127 1
        return self._subscribe(nodes, attr, queuesize=1)
128
129 1
    def _get_node(self, nodeid):
130 1
        if isinstance(nodeid, ua.NodeId):
131
            node = Node(self.server, nodeid)
132 1
        elif isinstance(nodeid, Node):
133
            node = nodeid
134
        else:
135 1
            node = Node(self.server, ua.NodeId(nodeid))
136 1
        return node
137
138 1
    def _get_filter_from_event_type(self, eventtype):
139 1
        eventtype = self._get_node(eventtype)
140 1
        evfilter = ua.EventFilter()
141 1
        for desc in eventtype.get_children_descriptions(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable):
142 1
            op = ua.SimpleAttributeOperand()
143 1
            op.TypeDefinitionId = eventtype.nodeid
144 1
            op.AttributeId = AttributeIds.Value
145 1
            op.BrowsePath = [desc.BrowseName]
146 1
            evfilter.SelectClauses.append(op)
147 1
        return evfilter
148
149 1
    def subscribe_events(self, sourcenode=ObjectIds.Server, evtype=ObjectIds.BaseEventType):
150
        """
151
        Subscribe to events from a node. Default node is Server node. 
152
        In most servers the server node is the only one you can subscribe to.
153
        Return a handle which can be used to unsubscribe
154
        """
155 1
        sourcenode = self._get_node(sourcenode)
156 1
        evfilter = self._get_filter_from_event_type(evtype)
157 1
        return self._subscribe(sourcenode, AttributeIds.EventNotifier, evfilter)
158
159 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
160 1
        is_list = True
161 1
        if not type(nodes) in (list, tuple):
162 1
            is_list = False
163 1
            nodes = [nodes]
164 1
        mirs = []
165 1
        for node in nodes:
166 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
167 1
            mirs.append(mir)
168
169 1
        mids = self.create_monitored_items(mirs)
170 1
        if is_list:
171 1
            return mids
172 1
        if type(mids[0]) == ua.StatusCode:
173 1
            mids[0].check()
174 1
        return mids[0]
175
176 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
177 1
        rv = ua.ReadValueId()
178 1
        rv.NodeId = node.nodeid
179 1
        rv.AttributeId = attr
180
        # rv.IndexRange //We leave it null, then the entire array is returned
181 1
        mparams = ua.MonitoringParameters()
182 1
        with self._lock:
183 1
            self._client_handle += 1
184 1
            mparams.ClientHandle = self._client_handle
185 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
186 1
        mparams.QueueSize = queuesize
187 1
        mparams.DiscardOldest = True
188 1
        if mfilter:
189 1
            mparams.Filter = mfilter
190 1
        mir = ua.MonitoredItemCreateRequest()
191 1
        mir.ItemToMonitor = rv
192 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
193 1
        mir.RequestedParameters = mparams
194 1
        return mir
195
196 1
    def create_monitored_items(self, monitored_items):
197
        """
198
        low level method to have full control over subscription parameters
199
        Client handle must be unique since it will be used as key for internal registration of data
200
        """
201 1
        params = ua.CreateMonitoredItemsParameters()
202 1
        params.SubscriptionId = self.subscription_id
203 1
        params.ItemsToCreate = monitored_items
204 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
205
        
206 1
        mids = []
207 1
        results = self.server.create_monitored_items(params)
208
        # FIXME: Race condition here
209
        # We lock as early as possible. But in some situation, a notification may arrives before
210
        # locking and we will not be able to prosess it. To avoid issues, users should subscribe 
211
        # to all nodes at once
212 1
        with self._lock:  
213 1
            for idx, result in enumerate(results):
214 1
                mi = params.ItemsToCreate[idx]
215 1
                if not result.StatusCode.is_good():
216 1
                    mids.append(result.StatusCode)
217 1
                    continue
218 1
                data = SubscriptionItemData()
219 1
                data.client_handle = mi.RequestedParameters.ClientHandle
220 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
221 1
                data.attribute = mi.ItemToMonitor.AttributeId
222 1
                data.server_handle = result.MonitoredItemId
223 1
                data.mfilter = result.FilterResult
224 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
225
226 1
                mids.append(result.MonitoredItemId)
227 1
        return mids
228
229 1
    def unsubscribe(self, handle):
230
        """
231
        unsubscribe to datachange or events using the handle returned while subscribing
232
        if you delete subscription, you do not need to unsubscribe
233
        """
234 1
        params = ua.DeleteMonitoredItemsParameters()
235 1
        params.SubscriptionId = self.subscription_id
236 1
        params.MonitoredItemIds = [handle]
237 1
        results = self.server.delete_monitored_items(params)
238 1
        results[0].check()
239 1
        with self._lock:
240 1
            for k, v in self._monitoreditems_map.items():
241 1
                if v.server_handle == handle:
242 1
                    del(self._monitoreditems_map[k])
243 1
                    return
244
245