Completed
Pull Request — master (#161)
by Denis
04:02
created

SubHandler.event()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 5
rs 9.4285
ccs 1
cts 2
cp 0.5
crap 1.125
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10 1
from opcua.common import event
11
12
13 1
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18
19 1
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24
25 1
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notfication from server
28
        """
29
        pass
30
31 1
    def event_notification(self, event):
32
        """
33
        called for every event notfication from server
34
        """
35
        pass
36
37 1
    def status_change_notification(self, status):
38
        """
39
        called for every status change notfication from server
40
        """
41
        pass
42
43
44 1
class EventResult(object):
45
    """
46
    To be sent to clients for every events from server
47
    """
48
49 1
    def __init__(self):
50 1
        self.server_handle = None
51
52 1
    def __str__(self):
53
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
54 1
    __repr__ = __str__
55
56
57 1
class SubscriptionItemData(object):
58
    """
59
    To store usefull data from a monitored item
60
    """
61 1
    def __init__(self):
62 1
        self.node = None
63 1
        self.client_handle = None
64 1
        self.server_handle = None
65 1
        self.attribute = None
66 1
        self.mfilter = None
67
68
69 1
class DataChangeNotif(object):
70
    """
71
    To be send to clients for every datachange notification from server
72
    """
73 1
    def __init__(self, subscription_data, monitored_item):
74 1
        self.monitored_item = monitored_item
75 1
        self.subscription_data = subscription_data
76
77 1
    def __str__(self):
78
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
79 1
    __repr__ = __str__
80
81
82 1
class Subscription(object):
83
    """
84
    Subscription object returned by Server or Client objects.
85
    The object represent a subscription to an opc-ua server.
86
    This is a high level class, especially subscribe_data_change
87
    and subscribe_events methods. If more control is necessary look at
88
    code and/or use create_monitored_items method.
89
    """
90
91 1
    def __init__(self, server, params, handler):
92 1
        self.logger = logging.getLogger(__name__)
93 1
        self.server = server
94 1
        self._client_handle = 200
95 1
        self._handler = handler
96 1
        self.parameters = params  # move to data class
97 1
        self._monitoreditems_map = {}
98 1
        self._lock = Lock()
99 1
        self.subscription_id = None
100 1
        response = self.server.create_subscription(params, self.publish_callback)
101 1
        self.subscription_id = response.SubscriptionId  # move to data class
102 1
        self.server.publish()
103 1
        self.server.publish()
104
105 1
    def delete(self):
106
        """
107
        Delete subscription on server. This is automatically done by Client and Server classes on exit
108
        """
109 1
        results = self.server.delete_subscriptions([self.subscription_id])
110 1
        results[0].check()
111
112 1
    def publish_callback(self, publishresult):
113 1
        self.logger.info("Publish callback called with result: %s", publishresult)
114 1
        while self.subscription_id is None:
115 1
            time.sleep(0.01)
116
117 1
        for notif in publishresult.NotificationMessage.NotificationData:
118 1
            if isinstance(notif, ua.DataChangeNotification):
119 1
                self._call_datachange(notif)
120 1
            elif isinstance(notif, ua.EventNotificationList):
121 1
                self._call_event(notif)
122
            elif isinstance(notif, ua.StatusChangeNotification):
123
                self._call_status(notif)
124
            else:
125
                self.logger.warning("Notification type not supported yet for notification %s", notif)
126
127 1
        ack = ua.SubscriptionAcknowledgement()
128 1
        ack.SubscriptionId = self.subscription_id
129 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
130 1
        self.server.publish([ack])
131
132 1
    def _call_datachange(self, datachange):
133 1
        for item in datachange.MonitoredItems:
134 1
            with self._lock:
135 1
                if item.ClientHandle not in self._monitoreditems_map:
136 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
137 1
                    continue
138 1
                data = self._monitoreditems_map[item.ClientHandle]
139 1
            if hasattr(self._handler, "datachange_notification"):
140 1
                event_data = DataChangeNotif(data, item)
141 1
                try:
142 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
143
                except Exception:
144
                    self.logger.exception("Exception calling data change handler")
145 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
146 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
147 1
                try:
148 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
149
                except Exception:
150
                    self.logger.exception("Exception calling deprecated data change handler")
151
            else:
152
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
153
154 1
    def _call_event(self, eventlist):
155 1
        for event in eventlist.Events:
156 1
            with self._lock:
157 1
                data = self._monitoreditems_map[event.ClientHandle]
158 1
            result = EventResult()
159 1
            result.server_handle = data.server_handle
160 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
161 1
                if len(sattr.BrowsePath) == 0:
162
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
163
                else:
164 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
165 1
            if hasattr(self._handler, "event_notification"):
166 1
                try:
167 1
                    self._handler.event_notification(result)
168
                except Exception:
169
                    self.logger.exception("Exception calling event handler")
170
            elif hasattr(self._handler, "event"):  # depcrecated API
171
                try:
172
                    self._handler.event(data.server_handle, result)
173
                except Exception:
174
                    self.logger.exception("Exception calling deprecated event handler")
175
            else:
176
                self.logger.error("Event subscription created but handler has no event_notification method")
177
178 1
    def _call_status(self, status):
179
        try:
180
            self._handler.status_change_notification(status.Status)
181
        except Exception:
182
            self.logger.exception("Exception calling status change handler")
183
184 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
185
        """
186
        Subscribe for data change events for a node or list of nodes.
187
        default attribute is Value.
188
        Return a handle which can be used to unsubscribe
189
        If more control is necessary use create_monitored_items method
190
        """
191 1
        return self._subscribe(nodes, attr, queuesize=0)
192
193 1
    def _get_node(self, nodeid):
194 1
        if isinstance(nodeid, ua.NodeId):
195
            node = Node(self.server, nodeid)
196 1
        elif isinstance(nodeid, Node):
197 1
            node = nodeid
198
        else:
199 1
            node = Node(self.server, ua.NodeId(nodeid))
200 1
        return node
201
202 1
    def _get_filter_from_event_type(self, eventtype):
203 1
        eventtype = self._get_node(eventtype)
204 1
        evfilter = ua.EventFilter()
205 1
        for property in event.get_event_properties_from_type_node(eventtype):
206 1
            op = ua.SimpleAttributeOperand()
207 1
            op.TypeDefinitionId = eventtype.nodeid
208 1
            op.AttributeId = ua.AttributeIds.Value
209 1
            op.BrowsePath = [property.get_browse_name()]
210 1
            evfilter.SelectClauses.append(op)
211 1
        return evfilter
212
213 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType):
214
        """
215
        Subscribe to events from a node. Default node is Server node.
216
        In most servers the server node is the only one you can subscribe to.
217
        Return a handle which can be used to unsubscribe
218
        """
219 1
        sourcenode = self._get_node(sourcenode)
220 1
        evfilter = self._get_filter_from_event_type(evtype)
221 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
222
223 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
224 1
        is_list = True
225 1
        if not type(nodes) in (list, tuple):
226 1
            is_list = False
227 1
            nodes = [nodes]
228 1
        mirs = []
229 1
        for node in nodes:
230 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
231 1
            mirs.append(mir)
232
233 1
        mids = self.create_monitored_items(mirs)
234 1
        if is_list:
235 1
            return mids
236 1
        if type(mids[0]) == ua.StatusCode:
237 1
            mids[0].check()
238 1
        return mids[0]
239
240 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
241 1
        rv = ua.ReadValueId()
242 1
        rv.NodeId = node.nodeid
243 1
        rv.AttributeId = attr
244
        # rv.IndexRange //We leave it null, then the entire array is returned
245 1
        mparams = ua.MonitoringParameters()
246 1
        with self._lock:
247 1
            self._client_handle += 1
248 1
            mparams.ClientHandle = self._client_handle
249 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
250 1
        mparams.QueueSize = queuesize
251 1
        mparams.DiscardOldest = True
252 1
        if mfilter:
253 1
            mparams.Filter = mfilter
254 1
        mir = ua.MonitoredItemCreateRequest()
255 1
        mir.ItemToMonitor = rv
256 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
257 1
        mir.RequestedParameters = mparams
258 1
        return mir
259
260 1
    def create_monitored_items(self, monitored_items):
261
        """
262
        low level method to have full control over subscription parameters
263
        Client handle must be unique since it will be used as key for internal registration of data
264
        """
265 1
        params = ua.CreateMonitoredItemsParameters()
266 1
        params.SubscriptionId = self.subscription_id
267 1
        params.ItemsToCreate = monitored_items
268 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
269
270
        # insert monitoried item into map to avoid notification arrive before result return
271
        # server_handle is left as None in purpose as we don't get it yet.
272 1
        with self._lock:
273 1
            for mi in monitored_items:
274 1
                data = SubscriptionItemData()
275 1
                data.client_handle = mi.RequestedParameters.ClientHandle
276 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
277 1
                data.attribute = mi.ItemToMonitor.AttributeId
278 1
                data.mfilter = mi.RequestedParameters.Filter
279 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
280 1
        results = self.server.create_monitored_items(params)
281 1
        mids = []
282
        # process result, add server_handle, or remove it if failed
283 1
        with self._lock:
284 1
            for idx, result in enumerate(results):
285 1
                mi = params.ItemsToCreate[idx]
286 1
                if not result.StatusCode.is_good():
287 1
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
288 1
                    mids.append(result.StatusCode)
289 1
                    continue
290 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
291 1
                data.server_handle = result.MonitoredItemId
292 1
                mids.append(result.MonitoredItemId)
293 1
        return mids
294
295 1
    def unsubscribe(self, handle):
296
        """
297
        unsubscribe to datachange or events using the handle returned while subscribing
298
        if you delete subscription, you do not need to unsubscribe
299
        """
300 1
        params = ua.DeleteMonitoredItemsParameters()
301 1
        params.SubscriptionId = self.subscription_id
302 1
        params.MonitoredItemIds = [handle]
303 1
        results = self.server.delete_monitored_items(params)
304 1
        results[0].check()
305 1
        with self._lock:
306 1
            for k, v in self._monitoreditems_map.items():
307 1
                if v.server_handle == handle:
308 1
                    del(self._monitoreditems_map[k])
309 1
                    return
310
311