Passed
Push — dev ( 97f75d...d1bef8 )
by Olivier
03:40
created

status_change_notification()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 5
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.4285
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
11
12 1
class SubHandler(object):
13
    """
14
    Subscription Handler. To receive events from server for a subscription
15
    This class is just a sample class. Whatever class having these methods can be used
16
    """
17
18 1
    def data_change(self, handle, node, val, attr):
19
        """
20
        Deprecated, use datachange_notification
21
        """
22
        pass
23
24 1
    def datachange_notification(self, node, val, data):
25
        """
26
        called for every datachange notfication from server
27
        """
28
        pass
29
30 1
    def event_notification(self, event):
31
        """
32
        called for every event notfication from server
33
        """
34
        pass
35
36 1
    def status_change_notification(self, status):
37
        """
38
        called for every status change notfication from server
39
        """
40
        pass
41
42 1
    def event(self, handle, event):
43
        """
44
        Deprecated use event_notification
45
        """
46
        pass
47
48
49 1
class EventResult():
50
    """
51
    To be sent to clients for every events from server
52
    """
53
54 1
    def __init__(self):
55 1
        self.server_handle = None
56
57 1
    def __str__(self):
58
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
59 1
    __repr__ = __str__
60
61
62 1
class SubscriptionItemData():
63
    """
64
    To store usefull data from a monitored item
65
    """
66 1
    def __init__(self):
67 1
        self.node = None
68 1
        self.client_handle = None
69 1
        self.server_handle = None
70 1
        self.attribute = None
71 1
        self.mfilter = None
72
73
74 1
class DataChangeNotif():
75
    """
76
    To be send to clients for every datachange notification from server
77
    """
78 1
    def __init__(self, subscription_data, monitored_item):
79 1
        self.monitored_item = monitored_item
80 1
        self.subscription_data = subscription_data
81
82 1
    def __str__(self):
83
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
84 1
    __repr__ = __str__
85
86
87 1
class Subscription(object):
88
    """
89
    Subscription object returned by Server or Client objects.
90
    The object represent a subscription to an opc-ua server.
91
    This is a high level class, especially subscribe_data_change
92
    and subscribe_events methods. If more control is necessary look at
93
    code and/or use create_monitored_items method.
94
    """
95
96 1
    def __init__(self, server, params, handler):
97 1
        self.logger = logging.getLogger(__name__)
98 1
        self.server = server
99 1
        self._client_handle = 200
100 1
        self._handler = handler
101 1
        self.parameters = params  # move to data class
102 1
        self._monitoreditems_map = {}
103 1
        self._lock = Lock()
104 1
        self.subscription_id = None
105 1
        response = self.server.create_subscription(params, self.publish_callback)
106 1
        self.subscription_id = response.SubscriptionId  # move to data class
107 1
        self.server.publish()
108 1
        self.server.publish()
109
110 1
    def delete(self):
111
        """
112
        Delete subscription on server. This is automatically done by Client and Server classes on exit
113
        """
114 1
        results = self.server.delete_subscriptions([self.subscription_id])
115 1
        results[0].check()
116
117 1
    def publish_callback(self, publishresult):
118 1
        self.logger.info("Publish callback called with result: %s", publishresult)
119 1
        while self.subscription_id is None:
120 1
            time.sleep(0.01)
121
122 1
        for notif in publishresult.NotificationMessage.NotificationData:
123 1
            if isinstance(notif, ua.DataChangeNotification):
124 1
                self._call_datachange(notif)
125 1
            elif isinstance(notif, ua.EventNotificationList):
126 1
                self._call_event(notif)
127
            elif isinstance(notif, ua.StatusChangeNotification):
128
                self._call_status(notif)
129
            else:
130
                self.logger.warning("Notification type not supported yet for notification %s", notif)
131
132 1
        ack = ua.SubscriptionAcknowledgement()
133 1
        ack.SubscriptionId = self.subscription_id
134 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
135 1
        self.server.publish([ack])
136
137 1
    def _call_datachange(self, datachange):
138 1
        for item in datachange.MonitoredItems:
139 1
            with self._lock:
140 1
                if item.ClientHandle not in self._monitoreditems_map:
141 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
142 1
                    continue
143 1
                data = self._monitoreditems_map[item.ClientHandle]
144 1
            if hasattr(self._handler, "datachange_notification"):
145 1
                event_data = DataChangeNotif(data, item)
146 1
                try:
147 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
148
                except Exception:
149
                    self.logger.exception("Exception calling data change handler")
150 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
151 1
                self.logger.warning("data_change method is deprecated, use datavalue_changed")
152 1
                try:
153 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
154
                except Exception:
155
                    self.logger.exception("Exception calling deprecated data change handler")
156
            else:
157
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
158
159 1
    def _call_event(self, eventlist):
160 1
        for event in eventlist.Events:
161 1
            with self._lock:
162 1
                data = self._monitoreditems_map[event.ClientHandle]
163 1
            result = EventResult()
164 1
            result.server_handle = data.server_handle
165 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
166 1
                if len(sattr.BrowsePath) == 0:
167
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
168
                else:
169 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
170 1
            if hasattr(self._handler, "event_notification"):
171 1
                try:
172 1
                    self._handler.event_notification(result)
173
                except Exception:
174
                    self.logger.exception("Exception calling event handler")
175 1
            elif hasattr(self._handler, "event"):  # depcrecated API
176 1
                try:
177 1
                    self._handler.event(data.server_handle, result)
178
                except Exception:
179
                    self.logger.exception("Exception calling deprecated event handler")
180
            else:
181
                self.logger.error("Event subscription created but handler has no event_notification method")
182
183 1
    def _call_status(self, status):
184
        try:
185
            self._handler.status_change_notification(status.Status)
186
        except Exception:
187
            self.logger.exception("Exception calling status change handler")
188
189 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
190
        """
191
        Subscribe for data change events for a node or list of nodes.
192
        default attribute is Value.
193
        Return a handle which can be used to unsubscribe
194
        If more control is necessary use create_monitored_items method
195
        """
196 1
        return self._subscribe(nodes, attr, queuesize=1)
197
198 1
    def _get_node(self, nodeid):
199 1
        if isinstance(nodeid, ua.NodeId):
200
            node = Node(self.server, nodeid)
201 1
        elif isinstance(nodeid, Node):
202 1
            node = nodeid
203
        else:
204 1
            node = Node(self.server, ua.NodeId(nodeid))
205 1
        return node
206
207 1
    def _get_filter_from_event_type(self, eventtype):
208 1
        eventtype = self._get_node(eventtype)
209 1
        evfilter = ua.EventFilter()
210 1
        for desc in eventtype.get_children_descriptions(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable):
211 1
            op = ua.SimpleAttributeOperand()
212 1
            op.TypeDefinitionId = eventtype.nodeid
213 1
            op.AttributeId = ua.AttributeIds.Value
214 1
            op.BrowsePath = [desc.BrowseName]
215 1
            evfilter.SelectClauses.append(op)
216 1
        return evfilter
217
218 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType):
219
        """
220
        Subscribe to events from a node. Default node is Server node.
221
        In most servers the server node is the only one you can subscribe to.
222
        Return a handle which can be used to unsubscribe
223
        """
224 1
        sourcenode = self._get_node(sourcenode)
225 1
        evfilter = self._get_filter_from_event_type(evtype)
226 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
227
228 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
229 1
        is_list = True
230 1
        if not type(nodes) in (list, tuple):
231 1
            is_list = False
232 1
            nodes = [nodes]
233 1
        mirs = []
234 1
        for node in nodes:
235 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
236 1
            mirs.append(mir)
237
238 1
        mids = self.create_monitored_items(mirs)
239 1
        if is_list:
240 1
            return mids
241 1
        if type(mids[0]) == ua.StatusCode:
242 1
            mids[0].check()
243 1
        return mids[0]
244
245 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
246 1
        rv = ua.ReadValueId()
247 1
        rv.NodeId = node.nodeid
248 1
        rv.AttributeId = attr
249
        # rv.IndexRange //We leave it null, then the entire array is returned
250 1
        mparams = ua.MonitoringParameters()
251 1
        with self._lock:
252 1
            self._client_handle += 1
253 1
            mparams.ClientHandle = self._client_handle
254 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
255 1
        mparams.QueueSize = queuesize
256 1
        mparams.DiscardOldest = True
257 1
        if mfilter:
258 1
            mparams.Filter = mfilter
259 1
        mir = ua.MonitoredItemCreateRequest()
260 1
        mir.ItemToMonitor = rv
261 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
262 1
        mir.RequestedParameters = mparams
263 1
        return mir
264
265 1
    def create_monitored_items(self, monitored_items):
266
        """
267
        low level method to have full control over subscription parameters
268
        Client handle must be unique since it will be used as key for internal registration of data
269
        """
270 1
        params = ua.CreateMonitoredItemsParameters()
271 1
        params.SubscriptionId = self.subscription_id
272 1
        params.ItemsToCreate = monitored_items
273 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
274
275 1
        mids = []
276 1
        results = self.server.create_monitored_items(params)
277
        # FIXME: Race condition here
278
        # We lock as early as possible. But in some situation, a notification may arrives before
279
        # locking and we will not be able to prosess it. To avoid issues, users should subscribe
280
        # to all nodes at once
281 1
        with self._lock:
282 1
            for idx, result in enumerate(results):
283 1
                mi = params.ItemsToCreate[idx]
284 1
                if not result.StatusCode.is_good():
285 1
                    mids.append(result.StatusCode)
286 1
                    continue
287 1
                data = SubscriptionItemData()
288 1
                data.client_handle = mi.RequestedParameters.ClientHandle
289 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
290 1
                data.attribute = mi.ItemToMonitor.AttributeId
291 1
                data.server_handle = result.MonitoredItemId
292
                #data.mfilter = result.FilterResult
293 1
                data.mfilter = mi.RequestedParameters.Filter
294 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
295
296 1
                mids.append(result.MonitoredItemId)
297 1
        return mids
298
299 1
    def unsubscribe(self, handle):
300
        """
301
        unsubscribe to datachange or events using the handle returned while subscribing
302
        if you delete subscription, you do not need to unsubscribe
303
        """
304 1
        params = ua.DeleteMonitoredItemsParameters()
305 1
        params.SubscriptionId = self.subscription_id
306 1
        params.MonitoredItemIds = [handle]
307 1
        results = self.server.delete_monitored_items(params)
308 1
        results[0].check()
309 1
        with self._lock:
310 1
            for k, v in self._monitoreditems_map.items():
311 1
                if v.server_handle == handle:
312 1
                    del(self._monitoreditems_map[k])
313 1
                    return
314
315