Completed
Pull Request — master (#188)
by Olivier
03:33
created

Subscription   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 205
Duplicated Lines 0 %

Test Coverage

Coverage 83.22%

Importance

Changes 7
Bugs 0 Features 0
Metric Value
dl 0
loc 205
ccs 124
cts 149
cp 0.8322
rs 8.3999
c 7
b 0
f 0
wmc 46

12 Methods

Rating   Name   Duplication   Size   Complexity  
A delete() 0 6 1
A _make_monitored_item_request() 0 19 3
B _call_event() 0 18 7
A subscribe_events() 0 11 2
B create_monitored_items() 0 34 6
C _call_datachange() 0 21 8
B publish_callback() 0 19 6
A unsubscribe() 0 15 4
A subscribe_data_change() 0 8 1
A _call_status() 0 5 2
B _subscribe() 0 16 5
A __init__() 0 13 1

How to fix   Complexity   

Complex Class

Complex classes like Subscription often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua.common import events
10
from opcua import Node
11
12 1
13
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18 1
19
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24 1
25
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notification from server
28
        """
29
        pass
30 1
31
    def event_notification(self, event):
32
        """
33
        called for every event notification from server
34
        """
35
        pass
36 1
37
    def status_change_notification(self, status):
38
        """
39
        called for every status change notification from server
40
        """
41
        pass
42
43 1
44
class SubscriptionItemData(object):
45
    """
46
    To store useful data from a monitored item
47
    """
48 1
    def __init__(self):
49 1
        self.node = None
50
        self.client_handle = None
51 1
        self.server_handle = None
52
        self.attribute = None
53 1
        self.mfilter = None
54
55 1
56
class DataChangeNotif(object):
57
    """
58
    To be send to clients for every datachange notification from server
59 1
    """
60 1
    def __init__(self, subscription_data, monitored_item):
61 1
        self.monitored_item = monitored_item
62 1
        self.subscription_data = subscription_data
63 1
64
    def __str__(self):
65
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
66 1
    __repr__ = __str__
67
68
69
class Subscription(object):
70 1
    """
71 1
    Subscription object returned by Server or Client objects.
72 1
    The object represent a subscription to an opc-ua server.
73 1
    This is a high level class, especially subscribe_data_change
74 1
    and subscribe_events methods. If more control is necessary look at
75 1
    code and/or use create_monitored_items method.
76
    """
77
78 1
    def __init__(self, server, params, handler):
79
        self.logger = logging.getLogger(__name__)
80
        self.server = server
81
        self._client_handle = 200
82 1
        self._handler = handler
83 1
        self.parameters = params  # move to data class
84 1
        self._monitoreditems_map = {}
85
        self._lock = Lock()
86 1
        self.subscription_id = None
87
        response = self.server.create_subscription(params, self.publish_callback)
88 1
        self.subscription_id = response.SubscriptionId  # move to data class
89
        self.server.publish()
90
        self.server.publish()
91 1
92
    def delete(self):
93
        """
94
        Delete subscription on server. This is automatically done by Client and Server classes on exit
95
        """
96
        results = self.server.delete_subscriptions([self.subscription_id])
97
        results[0].check()
98
99
    def publish_callback(self, publishresult):
100 1
        self.logger.info("Publish callback called with result: %s", publishresult)
101 1
        while self.subscription_id is None:
102 1
            time.sleep(0.01)
103 1
104 1
        for notif in publishresult.NotificationMessage.NotificationData:
105 1
            if isinstance(notif, ua.DataChangeNotification):
106 1
                self._call_datachange(notif)
107 1
            elif isinstance(notif, ua.EventNotificationList):
108 1
                self._call_event(notif)
109 1
            elif isinstance(notif, ua.StatusChangeNotification):
110 1
                self._call_status(notif)
111 1
            else:
112 1
                self.logger.warning("Notification type not supported yet for notification %s", notif)
113
114 1
        ack = ua.SubscriptionAcknowledgement()
115
        ack.SubscriptionId = self.subscription_id
116
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
117
        self.server.publish([ack])
118 1
119 1
    def _call_datachange(self, datachange):
120
        for item in datachange.MonitoredItems:
121 1
            with self._lock:
122 1
                if item.ClientHandle not in self._monitoreditems_map:
123 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
124 1
                    continue
125
                data = self._monitoreditems_map[item.ClientHandle]
126 1
            if hasattr(self._handler, "datachange_notification"):
127 1
                event_data = DataChangeNotif(data, item)
128 1
                try:
129 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
130 1
                except Exception:
131
                    self.logger.exception("Exception calling data change handler")
132
            elif hasattr(self._handler, "data_change"):  # deprecated API
133
                self.logger.warning("data_change method is deprecated, use datachange_notification")
134
                try:
135
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
136 1
                except Exception:
137 1
                    self.logger.exception("Exception calling deprecated data change handler")
138 1
            else:
139 1
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
140
141 1
    def _call_event(self, eventlist):
142 1
        for event in eventlist.Events:
143 1
            with self._lock:
144 1
                data = self._monitoreditems_map[event.ClientHandle]
145
            result = events.event_obj_from_event_fields(data.mfilter.SelectClauses, event.EventFields)
146
            result.server_handle = data.server_handle
147 1
            if hasattr(self._handler, "event_notification"):
148 1
                try:
149 1
                    self._handler.event_notification(result)
150 1
                except Exception:
151 1
                    self.logger.exception("Exception calling event handler")
152
            elif hasattr(self._handler, "event"):  # depcrecated API
153
                try:
154 1
                    self._handler.event(data.server_handle, result)
155 1
                except Exception:
156 1
                    self.logger.exception("Exception calling deprecated event handler")
157 1
            else:
158
                self.logger.error("Event subscription created but handler has no event_notification method")
159
160
    def _call_status(self, status):
161
        try:
162
            self._handler.status_change_notification(status.Status)
163 1
        except Exception:
164 1
            self.logger.exception("Exception calling status change handler")
165 1
166 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
167 1
        """
168 1
        Subscribe for data change events for a node or list of nodes.
169 1
        default attribute is Value.
170 1
        Return a handle which can be used to unsubscribe
171
        If more control is necessary use create_monitored_items method
172
        """
173 1
        return self._subscribe(nodes, attr, queuesize=0)
174 1
175 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
176 1
        """
177
        Subscribe to events from a node. Default node is Server node.
178
        In most servers the server node is the only one you can subscribe to.
179
        if evfilter is provided, evtype is ignored
180
        Return a handle which can be used to unsubscribe
181
        """
182
        sourcenode = Node(self.server, sourcenode)
183
        if evfilter is None:
184
            evfilter = events.get_filter_from_event_type(Node(self.server, evtype))
185
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
186
187 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
188
        is_list = True
189
        if not type(nodes) in (list, tuple):
190
            is_list = False
191
            nodes = [nodes]
192
        mirs = []
193 1
        for node in nodes:
194
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
195
            mirs.append(mir)
196
197
        mids = self.create_monitored_items(mirs)
198
        if is_list:
199
            return mids
200 1
        if type(mids[0]) == ua.StatusCode:
201
            mids[0].check()
202 1
        return mids[0]
203 1
204
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
205 1
        rv = ua.ReadValueId()
206 1
        rv.NodeId = node.nodeid
207
        rv.AttributeId = attr
208 1
        # rv.IndexRange //We leave it null, then the entire array is returned
209 1
        mparams = ua.MonitoringParameters()
210
        with self._lock:
211 1
            self._client_handle += 1
212 1
            mparams.ClientHandle = self._client_handle
213 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
214 1
        mparams.QueueSize = queuesize
215 1
        mparams.DiscardOldest = True
216 1
        if mfilter:
217 1
            mparams.Filter = mfilter
218 1
        mir = ua.MonitoredItemCreateRequest()
219 1
        mir.ItemToMonitor = rv
220 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
221
        mir.RequestedParameters = mparams
222 1
        return mir
223
224
    def create_monitored_items(self, monitored_items):
225
        """
226
        low level method to have full control over subscription parameters
227
        Client handle must be unique since it will be used as key for internal registration of data
228 1
        """
229 1
        params = ua.CreateMonitoredItemsParameters()
230 1
        params.SubscriptionId = self.subscription_id
231
        params.ItemsToCreate = monitored_items
232 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
233 1
234 1
        # insert monitored item into map to avoid notification arrive before result return
235 1
        # server_handle is left as None in purpose as we don't get it yet.
236 1
        with self._lock:
237 1
            for mi in monitored_items:
238 1
                data = SubscriptionItemData()
239 1
                data.client_handle = mi.RequestedParameters.ClientHandle
240 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
241
                data.attribute = mi.ItemToMonitor.AttributeId
242 1
                data.mfilter = mi.RequestedParameters.Filter
243 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
244 1
        results = self.server.create_monitored_items(params)
245 1
        mids = []
246 1
        # process result, add server_handle, or remove it if failed
247 1
        with self._lock:
248
            for idx, result in enumerate(results):
249 1
                mi = params.ItemsToCreate[idx]
250 1
                if not result.StatusCode.is_good():
251 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
252 1
                    mids.append(result.StatusCode)
253
                    continue
254 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
255 1
                data.server_handle = result.MonitoredItemId
256 1
                mids.append(result.MonitoredItemId)
257 1
        return mids
258 1
259 1
    def unsubscribe(self, handle):
260 1
        """
261 1
        unsubscribe to datachange or events using the handle returned while subscribing
262 1
        if you delete subscription, you do not need to unsubscribe
263 1
        """
264 1
        params = ua.DeleteMonitoredItemsParameters()
265 1
        params.SubscriptionId = self.subscription_id
266 1
        params.MonitoredItemIds = [handle]
267 1
        results = self.server.delete_monitored_items(params)
268
        results[0].check()
269 1
        with self._lock:
270
            for k, v in self._monitoreditems_map.items():
271
                if v.server_handle == handle:
272
                    del(self._monitoreditems_map[k])
273
                    return
274 1
275
276