Completed
Pull Request — master (#161)
by Denis
02:27
created

SubHandler.event()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
cc 1
dl 0
loc 5
rs 9.4285
ccs 1
cts 2
cp 0.5
crap 1.125
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
from opcua.common import event
11
12 1
13
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18 1
19
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24 1
25
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notfication from server
28
        """
29
        pass
30 1
31
    def event_notification(self, event):
32
        """
33
        called for every event notfication from server
34
        """
35
        pass
36 1
37
    def status_change_notification(self, status):
38
        """
39
        called for every status change notfication from server
40
        """
41
        pass
42 1
43
44
class EventResult(object):
45
    """
46
    To be sent to clients for every events from server
47
    """
48
49 1
    def __init__(self):
50
        self.server_handle = None
51
52
    def __str__(self):
53
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
54 1
    __repr__ = __str__
55 1
56
57 1
class SubscriptionItemData(object):
58
    """
59 1
    To store usefull data from a monitored item
60
    """
61
    def __init__(self):
62 1
        self.node = None
63
        self.client_handle = None
64
        self.server_handle = None
65
        self.attribute = None
66 1
        self.mfilter = None
67 1
68 1
69 1
class DataChangeNotif(object):
70 1
    """
71 1
    To be send to clients for every datachange notification from server
72
    """
73
    def __init__(self, subscription_data, monitored_item):
74 1
        self.monitored_item = monitored_item
75
        self.subscription_data = subscription_data
76
77
    def __str__(self):
78 1
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
79 1
    __repr__ = __str__
80 1
81
82 1
class Subscription(object):
83
    """
84 1
    Subscription object returned by Server or Client objects.
85
    The object represent a subscription to an opc-ua server.
86
    This is a high level class, especially subscribe_data_change
87 1
    and subscribe_events methods. If more control is necessary look at
88
    code and/or use create_monitored_items method.
89
    """
90
91
    def __init__(self, server, params, handler):
92
        self.logger = logging.getLogger(__name__)
93
        self.server = server
94
        self._client_handle = 200
95
        self._handler = handler
96 1
        self.parameters = params  # move to data class
97 1
        self._monitoreditems_map = {}
98 1
        self._lock = Lock()
99 1
        self.subscription_id = None
100 1
        response = self.server.create_subscription(params, self.publish_callback)
101 1
        self.subscription_id = response.SubscriptionId  # move to data class
102 1
        self.server.publish()
103 1
        self.server.publish()
104 1
105 1
    def delete(self):
106 1
        """
107 1
        Delete subscription on server. This is automatically done by Client and Server classes on exit
108 1
        """
109
        results = self.server.delete_subscriptions([self.subscription_id])
110 1
        results[0].check()
111
112
    def publish_callback(self, publishresult):
113
        self.logger.info("Publish callback called with result: %s", publishresult)
114 1
        while self.subscription_id is None:
115 1
            time.sleep(0.01)
116
117 1
        for notif in publishresult.NotificationMessage.NotificationData:
118 1
            if isinstance(notif, ua.DataChangeNotification):
119 1
                self._call_datachange(notif)
120 1
            elif isinstance(notif, ua.EventNotificationList):
121
                self._call_event(notif)
122 1
            elif isinstance(notif, ua.StatusChangeNotification):
123 1
                self._call_status(notif)
124 1
            else:
125 1
                self.logger.warning("Notification type not supported yet for notification %s", notif)
126 1
127
        ack = ua.SubscriptionAcknowledgement()
128
        ack.SubscriptionId = self.subscription_id
129
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
130
        self.server.publish([ack])
131
132 1
    def _call_datachange(self, datachange):
133 1
        for item in datachange.MonitoredItems:
134 1
            with self._lock:
135 1
                if item.ClientHandle not in self._monitoreditems_map:
136
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
137 1
                    continue
138 1
                data = self._monitoreditems_map[item.ClientHandle]
139 1
            if hasattr(self._handler, "datachange_notification"):
140 1
                event_data = DataChangeNotif(data, item)
141 1
                try:
142 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
143 1
                except Exception:
144 1
                    self.logger.exception("Exception calling data change handler")
145 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
146 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
147 1
                try:
148
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
149
                except Exception:
150 1
                    self.logger.exception("Exception calling deprecated data change handler")
151 1
            else:
152 1
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
153 1
154
    def _call_event(self, eventlist):
155
        for event in eventlist.Events:
156
            with self._lock:
157
                data = self._monitoreditems_map[event.ClientHandle]
158
            result = EventResult()
159 1
            result.server_handle = data.server_handle
160 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
161 1
                if len(sattr.BrowsePath) == 0:
162 1
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
163 1
                else:
164 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
165 1
            if hasattr(self._handler, "event_notification"):
166 1
                try:
167
                    self._handler.event_notification(result)
168
                except Exception:
169 1
                    self.logger.exception("Exception calling event handler")
170 1
            elif hasattr(self._handler, "event"):  # depcrecated API
171 1
                try:
172 1
                    self._handler.event(data.server_handle, result)
173
                except Exception:
174
                    self.logger.exception("Exception calling deprecated event handler")
175 1
            else:
176 1
                self.logger.error("Event subscription created but handler has no event_notification method")
177 1
178
    def _call_status(self, status):
179
        try:
180
            self._handler.status_change_notification(status.Status)
181
        except Exception:
182
            self.logger.exception("Exception calling status change handler")
183 1
184
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
185
        """
186
        Subscribe for data change events for a node or list of nodes.
187
        default attribute is Value.
188
        Return a handle which can be used to unsubscribe
189 1
        If more control is necessary use create_monitored_items method
190
        """
191
        return self._subscribe(nodes, attr, queuesize=0)
192
193
    def _get_node(self, nodeid):
194
        if isinstance(nodeid, ua.NodeId):
195
            node = Node(self.server, nodeid)
196 1
        elif isinstance(nodeid, Node):
197
            node = nodeid
198 1
        else:
199 1
            node = Node(self.server, ua.NodeId(nodeid))
200
        return node
201 1
202 1
    def _get_filter_from_event_type(self, eventtype):
203
        eventtype = self._get_node(eventtype)
204 1
        evfilter = ua.EventFilter()
205 1
        for property in event.get_event_properties_from_type_node(eventtype):
206
            op = ua.SimpleAttributeOperand()
207 1
            op.TypeDefinitionId = eventtype.nodeid
208 1
            op.AttributeId = ua.AttributeIds.Value
209 1
            op.BrowsePath = [property.get_browse_name()]
210 1
            evfilter.SelectClauses.append(op)
211 1
        return evfilter
212 1
213 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType):
214 1
        """
215 1
        Subscribe to events from a node. Default node is Server node.
216 1
        In most servers the server node is the only one you can subscribe to.
217
        Return a handle which can be used to unsubscribe
218 1
        """
219
        sourcenode = self._get_node(sourcenode)
220
        evfilter = self._get_filter_from_event_type(evtype)
221
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
222
223
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
224 1
        is_list = True
225 1
        if not type(nodes) in (list, tuple):
226 1
            is_list = False
227
            nodes = [nodes]
228 1
        mirs = []
229 1
        for node in nodes:
230 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
231 1
            mirs.append(mir)
232 1
233 1
        mids = self.create_monitored_items(mirs)
234 1
        if is_list:
235 1
            return mids
236 1
        if type(mids[0]) == ua.StatusCode:
237
            mids[0].check()
238 1
        return mids[0]
239 1
240 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
241 1
        rv = ua.ReadValueId()
242 1
        rv.NodeId = node.nodeid
243 1
        rv.AttributeId = attr
244
        # rv.IndexRange //We leave it null, then the entire array is returned
245 1
        mparams = ua.MonitoringParameters()
246 1
        with self._lock:
247 1
            self._client_handle += 1
248 1
            mparams.ClientHandle = self._client_handle
249
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
250 1
        mparams.QueueSize = queuesize
251 1
        mparams.DiscardOldest = True
252 1
        if mfilter:
253 1
            mparams.Filter = mfilter
254 1
        mir = ua.MonitoredItemCreateRequest()
255 1
        mir.ItemToMonitor = rv
256 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
257 1
        mir.RequestedParameters = mparams
258 1
        return mir
259 1
260 1
    def create_monitored_items(self, monitored_items):
261 1
        """
262 1
        low level method to have full control over subscription parameters
263 1
        Client handle must be unique since it will be used as key for internal registration of data
264
        """
265 1
        params = ua.CreateMonitoredItemsParameters()
266
        params.SubscriptionId = self.subscription_id
267
        params.ItemsToCreate = monitored_items
268
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
269
270 1
        # insert monitoried item into map to avoid notification arrive before result return
271 1
        # server_handle is left as None in purpose as we don't get it yet.
272 1
        with self._lock:
273 1
            for mi in monitored_items:
274
                data = SubscriptionItemData()
275
                data.client_handle = mi.RequestedParameters.ClientHandle
276
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
277 1
                data.attribute = mi.ItemToMonitor.AttributeId
278 1
                data.mfilter = mi.RequestedParameters.Filter
279 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
280 1
        results = self.server.create_monitored_items(params)
281 1
        mids = []
282 1
        # process result, add server_handle, or remove it if failed
283 1
        with self._lock:
284 1
            for idx, result in enumerate(results):
285 1
                mi = params.ItemsToCreate[idx]
286 1
                if not result.StatusCode.is_good():
287
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
288 1
                    mids.append(result.StatusCode)
289 1
                    continue
290 1
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
291 1
                data.server_handle = result.MonitoredItemId
292 1
                mids.append(result.MonitoredItemId)
293 1
        return mids
294 1
295 1
    def unsubscribe(self, handle):
296 1
        """
297 1
        unsubscribe to datachange or events using the handle returned while subscribing
298 1
        if you delete subscription, you do not need to unsubscribe
299
        """
300 1
        params = ua.DeleteMonitoredItemsParameters()
301
        params.SubscriptionId = self.subscription_id
302
        params.MonitoredItemIds = [handle]
303
        results = self.server.delete_monitored_items(params)
304
        results[0].check()
305 1
        with self._lock:
306 1
            for k, v in self._monitoreditems_map.items():
307 1
                if v.server_handle == handle:
308 1
                    del(self._monitoreditems_map[k])
309 1
                    return
310
311