Completed
Push — master ( 94ecad...929991 )
by Olivier
02:07
created

Subscription.__init__()   A

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 1
Metric Value
cc 1
dl 0
loc 13
rs 9.4285
ccs 13
cts 13
cp 1
crap 1
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10
11
12 1
class SubHandler(object):
13
    """
14
    Subscription Handler. To receive events from server for a subscription
15
    This class is just a sample class. Whatever class having these methods can be used
16
    """
17
18 1
    def data_change(self, handle, node, val, attr):
19
        """
20
        Deprecated, use datachange_notification
21
        """
22
        pass
23
24 1
    def datachange_notification(self, node, val, data):
25
        """
26
        called for every datachange notification from server
27
        """
28
        pass
29
30 1
    def event_notification(self, event):
31
        """
32
        called for every event notification from server
33
        """
34
        pass
35
36 1
    def status_change_notification(self, status):
37
        """
38
        called for every status change notification from server
39
        """
40
        pass
41
42
43 1
class EventResult(object):
44
    """
45
    To be sent to clients for every events from server
46
    """
47
48 1
    def __init__(self):
49 1
        self.server_handle = None
50
51 1
    def __str__(self):
52
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
53 1
    __repr__ = __str__
54
55 1
    def get_event_props_as_fields_dict(self):
56
        """
57
        convert all properties of the EventResult class to a dict of variants
58
        """
59 1
        field_vars = {}
60 1
        for key, value in vars(self).items():
61 1
            if not key.startswith("__") and key is not "server_handle":
62 1
                field_vars[key] = ua.Variant(value)
63 1
        return field_vars
64
65
66 1
class SubscriptionItemData(object):
67
    """
68
    To store useful data from a monitored item
69
    """
70 1
    def __init__(self):
71 1
        self.node = None
72 1
        self.client_handle = None
73 1
        self.server_handle = None
74 1
        self.attribute = None
75 1
        self.mfilter = None
76
77
78 1
class DataChangeNotif(object):
79
    """
80
    To be send to clients for every datachange notification from server
81
    """
82 1
    def __init__(self, subscription_data, monitored_item):
83 1
        self.monitored_item = monitored_item
84 1
        self.subscription_data = subscription_data
85
86 1
    def __str__(self):
87
        return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
88 1
    __repr__ = __str__
89
90
91 1
class Subscription(object):
92
    """
93
    Subscription object returned by Server or Client objects.
94
    The object represent a subscription to an opc-ua server.
95
    This is a high level class, especially subscribe_data_change
96
    and subscribe_events methods. If more control is necessary look at
97
    code and/or use create_monitored_items method.
98
    """
99
100 1
    def __init__(self, server, params, handler):
101 1
        self.logger = logging.getLogger(__name__)
102 1
        self.server = server
103 1
        self._client_handle = 200
104 1
        self._handler = handler
105 1
        self.parameters = params  # move to data class
106 1
        self._monitoreditems_map = {}
107 1
        self._lock = Lock()
108 1
        self.subscription_id = None
109 1
        response = self.server.create_subscription(params, self.publish_callback)
110 1
        self.subscription_id = response.SubscriptionId  # move to data class
111 1
        self.server.publish()
112 1
        self.server.publish()
113
114 1
    def delete(self):
115
        """
116
        Delete subscription on server. This is automatically done by Client and Server classes on exit
117
        """
118 1
        results = self.server.delete_subscriptions([self.subscription_id])
119 1
        results[0].check()
120
121 1
    def publish_callback(self, publishresult):
122 1
        self.logger.info("Publish callback called with result: %s", publishresult)
123 1
        while self.subscription_id is None:
124 1
            time.sleep(0.01)
125
126 1
        for notif in publishresult.NotificationMessage.NotificationData:
127 1
            if isinstance(notif, ua.DataChangeNotification):
128 1
                self._call_datachange(notif)
129 1
            elif isinstance(notif, ua.EventNotificationList):
130 1
                self._call_event(notif)
131
            elif isinstance(notif, ua.StatusChangeNotification):
132
                self._call_status(notif)
133
            else:
134
                self.logger.warning("Notification type not supported yet for notification %s", notif)
135
136 1
        ack = ua.SubscriptionAcknowledgement()
137 1
        ack.SubscriptionId = self.subscription_id
138 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
139 1
        self.server.publish([ack])
140
141 1
    def _call_datachange(self, datachange):
142 1
        for item in datachange.MonitoredItems:
143 1
            with self._lock:
144 1
                if item.ClientHandle not in self._monitoreditems_map:
145
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
146
                    continue
147 1
                data = self._monitoreditems_map[item.ClientHandle]
148 1
            if hasattr(self._handler, "datachange_notification"):
149 1
                event_data = DataChangeNotif(data, item)
150 1
                try:
151 1
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
152
                except Exception:
153
                    self.logger.exception("Exception calling data change handler")
154 1
            elif hasattr(self._handler, "data_change"):  # deprecated API
155 1
                self.logger.warning("data_change method is deprecated, use datachange_notification")
156 1
                try:
157 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
158
                except Exception:
159
                    self.logger.exception("Exception calling deprecated data change handler")
160
            else:
161
                self.logger.error("DataChange subscription created but handler has no datachange_notification method")
162
163 1
    def _call_event(self, eventlist):
164 1
        for event in eventlist.Events:
165 1
            with self._lock:
166 1
                data = self._monitoreditems_map[event.ClientHandle]
167 1
            result = EventResult()
168 1
            result.server_handle = data.server_handle
169 1
            for idx, sattr in enumerate(data.mfilter.SelectClauses):
170 1
                if len(sattr.BrowsePath) == 0:
171
                    setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
172
                else:
173 1
                    setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
174 1
            if hasattr(self._handler, "event_notification"):
175 1
                try:
176 1
                    self._handler.event_notification(result)
177
                except Exception:
178
                    self.logger.exception("Exception calling event handler")
179
            elif hasattr(self._handler, "event"):  # depcrecated API
180
                try:
181
                    self._handler.event(data.server_handle, result)
182
                except Exception:
183
                    self.logger.exception("Exception calling deprecated event handler")
184
            else:
185
                self.logger.error("Event subscription created but handler has no event_notification method")
186
187 1
    def _call_status(self, status):
188
        try:
189
            self._handler.status_change_notification(status.Status)
190
        except Exception:
191
            self.logger.exception("Exception calling status change handler")
192
193 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
194
        """
195
        Subscribe for data change events for a node or list of nodes.
196
        default attribute is Value.
197
        Return a handle which can be used to unsubscribe
198
        If more control is necessary use create_monitored_items method
199
        """
200 1
        return self._subscribe(nodes, attr, queuesize=0)
201
202 1
    def _get_node(self, nodeid):
203 1
        if isinstance(nodeid, ua.NodeId):
204
            node = Node(self.server, nodeid)
205 1
        elif isinstance(nodeid, Node):
206 1
            node = nodeid
207
        else:
208 1
            node = Node(self.server, ua.NodeId(nodeid))
209 1
        return node
210
211 1
    def _get_filter_from_event_type(self, eventtype):
212 1
        eventtype = self._get_node(eventtype)
213 1
        evfilter = ua.EventFilter()
214 1
        evfilter.SelectClauses = self._select_clauses_from_evtype(eventtype)
215 1
        evfilter.WhereClause = self._where_clause_from_evtype(eventtype)
216 1
        return evfilter
217 1
218 1
    def _select_clauses_from_evtype(self, evtype):
219 1
        clauses = []
220 1
        for prop in get_event_properties_from_type_node(evtype):
221
            op = ua.SimpleAttributeOperand()
222 1
            op.TypeDefinitionId = evtype.nodeid
223
            op.AttributeId = ua.AttributeIds.Value
224
            op.BrowsePath = [prop.get_browse_name()]
225
            clauses.append(op)
226
        return clauses
227
228 1
    def _where_clause_from_evtype(self, evtype):
229 1
        cf = ua.ContentFilter()
230 1
        el = ua.ContentFilterElement()
231
        # operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
232 1
        op = ua.SimpleAttributeOperand()
233 1
        op.TypeDefinitionId = evtype.nodeid
234 1
        op.BrowsePath.append(ua.QualifiedName("EventType", 0))
235 1
        op.AttributeId = ua.AttributeIds.Value
236 1
        el.FilterOperands.append(op)
237 1
        for subtypeid in [st.nodeid for st in self._get_subtypes(evtype)]:
238 1
            op = ua.LiteralOperand()
239 1
            op.Value = ua.Variant(subtypeid)
240 1
            el.FilterOperands.append(op)
241
        el.FilterOperator = ua.FilterOperator.InList
242 1
243 1
        cf.Elements.append(el)
244 1
        return cf
245 1
246 1
    def _get_subtypes(self, parent, nodes=None):
247 1
        if nodes is None:
248
            nodes = [parent]
249 1
        for child in parent.get_children(refs=ua.ObjectIds.HasSubtype):
250 1
            nodes.append(child)
251 1
            self._get_subtypes(child, nodes)
252 1
        return nodes
253
254 1
    def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
255 1
        """
256 1
        Subscribe to events from a node. Default node is Server node.
257 1
        In most servers the server node is the only one you can subscribe to.
258 1
        if evfilter is provided, evtype is ignored
259 1
        Return a handle which can be used to unsubscribe
260 1
        """
261 1
        sourcenode = self._get_node(sourcenode)
262 1
        if evfilter is None:
263 1
            evfilter = self._get_filter_from_event_type(evtype)
264 1
        return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
265 1
266 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
267 1
        is_list = True
268
        if not type(nodes) in (list, tuple):
269 1
            is_list = False
270
            nodes = [nodes]
271
        mirs = []
272
        for node in nodes:
273
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
274 1
            mirs.append(mir)
275 1
276 1
        mids = self.create_monitored_items(mirs)
277 1
        if is_list:
278
            return mids
279
        if type(mids[0]) == ua.StatusCode:
280
            mids[0].check()
281 1
        return mids[0]
282 1
283 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
284 1
        rv = ua.ReadValueId()
285 1
        rv.NodeId = node.nodeid
286 1
        rv.AttributeId = attr
287 1
        # rv.IndexRange //We leave it null, then the entire array is returned
288 1
        mparams = ua.MonitoringParameters()
289 1
        with self._lock:
290 1
            self._client_handle += 1
291
            mparams.ClientHandle = self._client_handle
292 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
293 1
        mparams.QueueSize = queuesize
294 1
        mparams.DiscardOldest = True
295 1
        if mfilter:
296 1
            mparams.Filter = mfilter
297 1
        mir = ua.MonitoredItemCreateRequest()
298 1
        mir.ItemToMonitor = rv
299 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
300 1
        mir.RequestedParameters = mparams
301 1
        return mir
302 1
303
    def create_monitored_items(self, monitored_items):
304 1
        """
305
        low level method to have full control over subscription parameters
306
        Client handle must be unique since it will be used as key for internal registration of data
307
        """
308
        params = ua.CreateMonitoredItemsParameters()
309 1
        params.SubscriptionId = self.subscription_id
310 1
        params.ItemsToCreate = monitored_items
311 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
312 1
313 1
        # insert monitored item into map to avoid notification arrive before result return
314 1
        # server_handle is left as None in purpose as we don't get it yet.
315 1 View Code Duplication
        with self._lock:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
316 1
            for mi in monitored_items:
317 1
                data = SubscriptionItemData()
318 1
                data.client_handle = mi.RequestedParameters.ClientHandle
319
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
320
                data.attribute = mi.ItemToMonitor.AttributeId
321 1
                data.mfilter = mi.RequestedParameters.Filter
322 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
323 1
        results = self.server.create_monitored_items(params)
324
        mids = []
325 1
        # process result, add server_handle, or remove it if failed
326 1
        with self._lock:
327
            for idx, result in enumerate(results):
328 1
                mi = params.ItemsToCreate[idx]
329 1
                if not result.StatusCode.is_good():
330
                    del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
331 1
                    mids.append(result.StatusCode)
332 1
                    continue
333
                data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
334 1
                data.server_handle = result.MonitoredItemId
335
                mids.append(result.MonitoredItemId)
336 1
        return mids
337
338
    def unsubscribe(self, handle):
339
        """
340
        unsubscribe to datachange or events using the handle returned while subscribing
341
        if you delete subscription, you do not need to unsubscribe
342
        """
343
        params = ua.DeleteMonitoredItemsParameters()
344
        params.SubscriptionId = self.subscription_id
345
        params.MonitoredItemIds = [handle]
346
        results = self.server.delete_monitored_items(params)
347
        results[0].check()
348
        with self._lock:
349
            for k, v in self._monitoreditems_map.items():
350
                if v.server_handle == handle:
351
                    del(self._monitoreditems_map[k])
352
                    return
353
354
355
def get_event_properties_from_type_node(node):
356
    properties = []
357
    curr_node = node
358
359
    while True:
360
        properties.extend(curr_node.get_properties())
361
362
        if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
363
            break
364
365
        parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
366
        if len(parents) != 1:  # Something went wrong
367
            return None
368
        curr_node = parents[0]
369
370
    return properties
371