Completed
Pull Request — master (#172)
by
unknown
06:10
created

EventResult.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
cc 1
dl 0
loc 2
rs 10
ccs 2
cts 2
cp 1
crap 1
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10 1
from opcua.common import event
11
12
13 1
class SubHandler(object):
14
    """
15
    Subscription Handler. To receive events from server for a subscription
16
    This class is just a sample class. Whatever class having these methods can be used
17
    """
18
19 1
    def data_change(self, handle, node, val, attr):
20
        """
21
        Deprecated, use datachange_notification
22
        """
23
        pass
24
25 1
    def datachange_notification(self, node, val, data):
26
        """
27
        called for every datachange notfication from server
28
        """
29
        pass
30
31 1
    def event_notification(self, event):
32
        """
33
        called for every event notfication from server
34
        """
35
        pass
36
37 1
    def status_change_notification(self, status):
38
        """
39
        called for every status change notfication from server
40
        """
41
        pass
42
43
44 1
class EventResult(object):
45
    """
46
    To be sent to clients for every events from server
47
    """
48
49 1
    def __init__(self):
50 1
        self.server_handle = None
51
52 1
    def __str__(self):
53
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
54 1
    __repr__ = __str__
55
56
    def get_field_variants(self):
57 1
        field_vars = {}
58
        for key, value in vars(self).items():
59
            if not key.startswith("__") and key is not "server_handle":
60
                field_vars[key] = ua.Variant(value)
61 1
        return field_vars
62 1
63 1
64 1
class SubscriptionItemData(object):
65 1
    """
66 1
    To store usefull data from a monitored item
67
    """
68
    def __init__(self):
69 1
        self.node = None
70
        self.client_handle = None
71
        self.server_handle = None
72
        self.attribute = None
73 1
        self.mfilter = None
74 1
75 1
76
class DataChangeNotif(object):
77 1
    """
78
    To be send to clients for every datachange notification from server
79 1
    """
80
    def __init__(self, subscription_data, monitored_item):
81
        self.monitored_item = monitored_item
82 1
        self.subscription_data = subscription_data
83
84
    def __str__(self):
85
        return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
86
    __repr__ = __str__
87
88
89
class Subscription(object):
90
    """
91 1
    Subscription object returned by Server or Client objects.
92 1
    The object represent a subscription to an opc-ua server.
93 1
    This is a high level class, especially subscribe_data_change
94 1
    and subscribe_events methods. If more control is necessary look at
95 1
    code and/or use create_monitored_items method.
96 1
    """
97 1
98 1
    def __init__(self, server, params, handler):
99 1
        self.logger = logging.getLogger(__name__)
100 1
        self.server = server
101 1
        self._client_handle = 200
102 1
        self._handler = handler
103 1
        self.parameters = params  # move to data class
104
        self._monitoreditems_map = {}
105 1
        self._lock = Lock()
106
        self.subscription_id = None
107
        response = self.server.create_subscription(params, self.publish_callback)
108
        self.subscription_id = response.SubscriptionId  # move to data class
109 1
        self.server.publish()
110 1
        self.server.publish()
111
112 1
    def delete(self):
113 1
        """
114 1
        Delete subscription on server. This is automatically done by Client and Server classes on exit
115 1
        """
116
        results = self.server.delete_subscriptions([self.subscription_id])
117 1
        results[0].check()
118 1
119 1
    def publish_callback(self, publishresult):
120 1
        self.logger.info("Publish callback called with result: %s", publishresult)
121 1
        while self.subscription_id is None:
122
            time.sleep(0.01)
123
124
        for notif in publishresult.NotificationMessage.NotificationData:
125
            if isinstance(notif, ua.DataChangeNotification):
126
                self._call_datachange(notif)
127 1
            elif isinstance(notif, ua.EventNotificationList):
128 1
                self._call_event(notif)
129 1
            elif isinstance(notif, ua.StatusChangeNotification):
130 1
                self._call_status(notif)
131
            else:
132 1
                self.logger.warning("Notification type not supported yet for notification %s", notif)
133 1
134 1
        ack = ua.SubscriptionAcknowledgement()
135 1
        ack.SubscriptionId = self.subscription_id
136 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
137 1
        self.server.publish([ack])
138 1
139 1
    def _call_datachange(self, datachange):
140 1
        for item in datachange.MonitoredItems:
141 1
            with self._lock:
142 1
                if item.ClientHandle not in self._monitoreditems_map:
143
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
144
                    continue
145 1
                data = self._monitoreditems_map[item.ClientHandle]
146 1
            if hasattr(self._handler, "datachange_notification"):
147 1
                event_data = DataChangeNotif(data, item)
148 1
                try:
149
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
150
                except Exception:
151
                    self.logger.exception("Exception calling data change handler")
152
            elif hasattr(self._handler, "data_change"):  # deprecated API
153
                self.logger.warning("data_change method is deprecated, use datachange_notification")
154 1
                try:
155 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
156 1
                except Exception:
157 1
                    self.logger.exception("Exception calling deprecated data change handler")
158 1
            else:
159 1
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
160 1
161 1
    def _call_event(self, eventlist):
162
        for event in eventlist.Events:
163
            with self._lock:
164 1
                data = self._monitoreditems_map[event.ClientHandle]
165 1
            result = EventResult()
166 1
            result.server_handle = data.server_handle
167 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
168
                if len(sattr.BrowsePath) == 0:
169
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
170
                else:
171
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
172
            if hasattr(self._handler, "event_notification"):
173
                try:
174
                    self._handler.event_notification(result)
175
                except Exception:
176
                    self.logger.exception("Exception calling event handler")
177
            elif hasattr(self._handler, "event"):  # depcrecated API
178 1
                try:
179
                    self._handler.event(data.server_handle, result)
180
                except Exception:
181
                    self.logger.exception("Exception calling deprecated event handler")
182
            else:
183
                self.logger.error("Event subscription created but handler has no event_notification method")
184 1
185
    def _call_status(self, status):
186
        try:
187
            self._handler.status_change_notification(status.Status)
188
        except Exception:
189
            self.logger.exception("Exception calling status change handler")
190
191 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
192
        """
193 1
        Subscribe for data change events for a node or list of nodes.
194 1
        default attribute is Value.
195
        Return a handle which can be used to unsubscribe
196 1
        If more control is necessary use create_monitored_items method
197 1
        """
198
        return self._subscribe(nodes, attr, queuesize=0)
199 1
200 1
    def _get_node(self, nodeid):
201
        if isinstance(nodeid, ua.NodeId):
202 1
            node = Node(self.server, nodeid)
203 1
        elif isinstance(nodeid, Node):
204 1
            node = nodeid
205 1
        else:
206 1
            node = Node(self.server, ua.NodeId(nodeid))
207 1
        return node
208 1
209 1
    def _get_filter_from_event_type(self, eventtype):
210 1
        eventtype = self._get_node(eventtype)
211 1
        evfilter = ua.EventFilter()
212
        for property in event.get_event_properties_from_type_node(eventtype):
213 1
            op = ua.SimpleAttributeOperand()
214
            op.TypeDefinitionId = eventtype.nodeid
215
            op.AttributeId = ua.AttributeIds.Value
216
            op.BrowsePath = [property.get_browse_name()]
217
            evfilter.SelectClauses.append(op)
218
        return evfilter
219 1
220 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType):
221 1
        """
222
        Subscribe to events from a node. Default node is Server node.
223 1
        In most servers the server node is the only one you can subscribe to.
224 1
        Return a handle which can be used to unsubscribe
225 1
        """
226 1
        sourcenode = self._get_node(sourcenode)
227 1
        evfilter = self._get_filter_from_event_type(evtype)
228 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
229 1
230 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
231 1
        is_list = True
232
        if not type(nodes) in (list, tuple):
233 1
            is_list = False
234 1
            nodes = [nodes]
235 1
        mirs = []
236 1
        for node in nodes:
237 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
238 1
            mirs.append(mir)
239
240 1
        mids = self.create_monitored_items(mirs)
241 1
        if is_list:
242 1
            return mids
243 1
        if type(mids[0]) == ua.StatusCode:
244
            mids[0].check()
245 1
        return mids[0]
246 1
247 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
248 1
        rv = ua.ReadValueId()
249 1
        rv.NodeId = node.nodeid
250 1
        rv.AttributeId = attr
251 1
        # rv.IndexRange //We leave it null, then the entire array is returned
252 1
        mparams = ua.MonitoringParameters()
253 1
        with self._lock:
254 1
            self._client_handle += 1
255 1
            mparams.ClientHandle = self._client_handle
256 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
257 1
        mparams.QueueSize = queuesize
258 1
        mparams.DiscardOldest = True
259
        if mfilter:
260 1
            mparams.Filter = mfilter
261
        mir = ua.MonitoredItemCreateRequest()
262
        mir.ItemToMonitor = rv
263
        mir.MonitoringMode = ua.MonitoringMode.Reporting
264
        mir.RequestedParameters = mparams
265 1
        return mir
266 1
267 1
    def create_monitored_items(self, monitored_items):
268 1
        """
269
        low level method to have full control over subscription parameters
270
        Client handle must be unique since it will be used as key for internal registration of data
271
        """
272 1
        params = ua.CreateMonitoredItemsParameters()
273 1
        params.SubscriptionId = self.subscription_id
274 1
        params.ItemsToCreate = monitored_items
275 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
276 1
277 1
        # insert monitoried item into map to avoid notification arrive before result return
278 1
        # server_handle is left as None in purpose as we don't get it yet.
279 1
        with self._lock:
280 1
            for mi in monitored_items:
281 1
                data = SubscriptionItemData()
282
                data.client_handle = mi.RequestedParameters.ClientHandle
283 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
284 1
                data.attribute = mi.ItemToMonitor.AttributeId
285 1
                data.mfilter = mi.RequestedParameters.Filter
286 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
287 1
        results = self.server.create_monitored_items(params)
288 1
        mids = []
289 1
        # process result, add server_handle, or remove it if failed
290 1
        with self._lock:
291 1
            for idx, result in enumerate(results):
292 1
                mi = params.ItemsToCreate[idx]
293 1
                if not result.StatusCode.is_good():
294
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
295 1
                    mids.append(result.StatusCode)
296
                    continue
297
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
298
                data.server_handle = result.MonitoredItemId
299
                mids.append(result.MonitoredItemId)
300 1
        return mids
301 1
302 1
    def unsubscribe(self, handle):
303 1
        """
304 1
        unsubscribe to datachange or events using the handle returned while subscribing
305 1
        if you delete subscription, you do not need to unsubscribe
306 1
        """
307 1
        params = ua.DeleteMonitoredItemsParameters()
308 1
        params.SubscriptionId = self.subscription_id
309 1
        params.MonitoredItemIds = [handle]
310
        results = self.server.delete_monitored_items(params)
311
        results[0].check()
312
        with self._lock:
313
            for k, v in self._monitoreditems_map.items():
314
                if v.server_handle == handle:
315
                    del(self._monitoreditems_map[k])
316
                    return
317
318