Completed
Push — master ( 68b0ec...7c15fc )
by Olivier
02:21
created

opcua.DataChangeNotif.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125
Metric Value
dl 0
loc 2
ccs 1
cts 2
cp 0.5
rs 10
cc 1
crap 1.125
1
"""
2
high level interface to subscriptions
3
"""
4 1
import time
5 1
import logging
6 1
from threading import Lock
7
8 1
from opcua import ua
9 1
from opcua import Node
10 1
from opcua import ObjectIds
11 1
from opcua import AttributeIds
12
#from opcua import Event
13
14
15 1
class SubHandler(object):
16
    """
17
    Subscription Handler. To receive events from server for a subscription
18
    This class is just a sample class. Whatever class having these methods can be used
19
    """
20
21 1
    def data_change(self, handle, node, val, attr):
22
        """
23
        Deprecated, use datachange_notification
24
        """
25
        pass
26
27 1
    def datachange_notification(self, node, val, data):
28
        """
29
        called for every datachange notfication from server
30
        """
31
        pass
32
33 1
    def event(self, handle, event):
34
        """
35
        called for every event notfication from server
36
        """
37
        pass
38
39
40 1
class EventResult():
41 1
    def __str__(self):
42
        return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
43 1
    __repr__ = __str__
44
45
46 1
class SubscriptionItemData():
47
    """
48
    To store usefull data from a monitored item
49
    """
50 1
    def __init__(self):
51 1
        self.node = None
52 1
        self.client_handle = None
53 1
        self.server_handle = None
54 1
        self.attribute = None
55 1
        self.mfilter = None
56
57
58 1
class DataChangeNotif():
59
    """
60
    To be send to clients for every notification from server
61
    """
62 1
    def __init__(self, subscription_data, monitored_item):
63
        self.monitored_item = monitored_item
64
        self.subscription_data = subscription_data
65
66 1
    def __str__(self):
67
        return "DataChangeNotfication({}, {})".format(self.suscription_data, self.monitored_item)
68 1
    __repr__ = __str__
69
70
71 1
class Subscription(object):
72
    """
73
    Subscription object returned by Server or Client objects.
74
    The object represent a subscription to an opc-ua server.
75
    This is a high level class, especially subscribe_data_change 
76
    and subscribe_events methods. If more control is necessary look at
77
    code and/or use create_monitored_items method.
78
    """
79
80 1
    def __init__(self, server, params, handler):
81 1
        self.logger = logging.getLogger(__name__)
82 1
        self.server = server
83 1
        self._client_handle = 200
84 1
        self._handler = handler
85 1
        self.parameters = params  # move to data class
86 1
        self._monitoreditems_map = {}
87 1
        self._lock = Lock()
88 1
        self.subscription_id = None
89 1
        response = self.server.create_subscription(params, self.publish_callback)
90 1
        self.subscription_id = response.SubscriptionId  # move to data class
91 1
        self.server.publish()
92 1
        self.server.publish()
93
94 1
    def delete(self):
95
        """
96
        Delete subscription on server. This is automatically done by Client and Server classes on exit
97
        """
98 1
        results = self.server.delete_subscriptions([self.subscription_id])
99 1
        results[0].check()
100
101 1
    def publish_callback(self, publishresult):
102 1
        self.logger.info("Publish callback called with result: %s", publishresult)
103 1
        while self.subscription_id is None:
104
            time.sleep(0.01)
105
106 1
        for notif in publishresult.NotificationMessage.NotificationData:
107 1
            if isinstance(notif, ua.DataChangeNotification):
108 1
                self._call_datachange(notif)
109 1
            elif isinstance(notif, ua.EventNotificationList):
110 1
                self._call_event(notif)
111
            elif isinstance(notif, ua.StatusChangeNotification):
112
                self._call_status(notif)
113
            else:
114
                self.logger.warning("Notification type not supported yet for notification %s", notif)
115
116 1
        ack = ua.SubscriptionAcknowledgement()
117 1
        ack.SubscriptionId = self.subscription_id
118 1
        ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
119 1
        self.server.publish([ack])
120
121 1
    def _call_datachange(self, datachange):
122 1
        for item in datachange.MonitoredItems:
123 1
            with self._lock:
124 1
                if item.ClientHandle not in self._monitoreditems_map:
125 1
                    self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
126 1
                    continue
127 1
                data = self._monitoreditems_map[item.ClientHandle]
128 1
            try:
129 1
                if hasattr(self._handler, "datachange_notification"):
130
                    event_data = DataChangeNotif(data, item)
131
                    self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
132 1
                elif hasattr(self._handler, "data_change"): 
133 1
                    self.logger.warning("data_change method is deprecated, use datavalue_changed")
134 1
                    self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
135
                else:
136
                    self.logger.error("DataChange subscription created but handler has no datachange_notification method")
137
            except Exception:
138
                self.logger.exception("Exception calling data change handler")
139
140 1
    def _call_event(self, eventlist):
141 1
        for event in eventlist.Events:
142 1
            with self._lock:
143 1
                data = self._monitoreditems_map[event.ClientHandle]
144 1
            try:
145
                #fields = {}
146 1
                result = EventResult()
147 1
                for idx, sattr in enumerate(data.mfilter.SelectClauses):
148
149 1
                    if len(sattr.BrowsePath) == 0:
150
                        #fields[ua.AttributeIdsInv[sattr.AttributeId]] = event.EventFields[idx].Value
151
                        setattr(result, ua.AttributeIdsInv[sattr.AttributeId], event.EventFields[idx].Value)
152
                    else:
153 1
                        setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
154
                #self._handler.event(data.server_handle, fields)
155 1
                self._handler.event(data.server_handle, result)
156
            except Exception:
157
                self.logger.exception("Exception calling event handler")
158
159 1
    def _call_status(self, status):
160
        try:
161
            self._handler.status_change(status.Status)
162
        except Exception:
163
            self.logger.exception("Exception calling status change handler")
164
165 1
    def subscribe_data_change(self, nodes, attr=ua.AttributeIds.Value):
166
        """
167
        Subscribe for data change events for a node or list of nodes.
168
        default attribute is Value.
169
        Return a handle which can be used to unsubscribe
170
        If more control is necessary use create_monitored_items method
171
        """
172 1
        return self._subscribe(nodes, attr, queuesize=1)
173
174 1
    def _get_node(self, nodeid):
175 1
        if isinstance(nodeid, ua.NodeId):
176
            node = Node(self.server, nodeid)
177 1
        elif isinstance(nodeid, Node):
178
            node = nodeid
179
        else:
180 1
            node = Node(self.server, ua.NodeId(nodeid))
181 1
        return node
182
183 1
    def _get_filter_from_event_type(self, eventtype):
184 1
        eventtype = self._get_node(eventtype)
185 1
        evfilter = ua.EventFilter()
186 1
        for desc in eventtype.get_children_descriptions(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable):
187 1
            op = ua.SimpleAttributeOperand()
188 1
            op.TypeDefinitionId = eventtype.nodeid
189 1
            op.AttributeId = AttributeIds.Value
190 1
            op.BrowsePath = [desc.BrowseName]
191 1
            evfilter.SelectClauses.append(op)
192 1
        return evfilter
193
194 1
    def subscribe_events(self, sourcenode=ObjectIds.Server, evtype=ObjectIds.BaseEventType):
195
        """
196
        Subscribe to events from a node. Default node is Server node. 
197
        In most servers the server node is the only one you can subscribe to.
198
        Return a handle which can be used to unsubscribe
199
        """
200 1
        sourcenode = self._get_node(sourcenode)
201 1
        evfilter = self._get_filter_from_event_type(evtype)
202 1
        return self._subscribe(sourcenode, AttributeIds.EventNotifier, evfilter)
203
204 1
    def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
205 1
        is_list = True
206 1
        if not type(nodes) in (list, tuple):
207 1
            is_list = False
208 1
            nodes = [nodes]
209 1
        mirs = []
210 1
        for node in nodes:
211 1
            mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
212 1
            mirs.append(mir)
213
214 1
        mids = self.create_monitored_items(mirs)
215 1
        if is_list:
216 1
            return mids
217 1
        if type(mids[0]) == ua.StatusCode:
218 1
            mids[0].check()
219 1
        return mids[0]
220
221 1
    def _make_monitored_item_request(self, node, attr, mfilter, queuesize):
222 1
        rv = ua.ReadValueId()
223 1
        rv.NodeId = node.nodeid
224 1
        rv.AttributeId = attr
225
        # rv.IndexRange //We leave it null, then the entire array is returned
226 1
        mparams = ua.MonitoringParameters()
227 1
        with self._lock:
228 1
            self._client_handle += 1
229 1
            mparams.ClientHandle = self._client_handle
230 1
        mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
231 1
        mparams.QueueSize = queuesize
232 1
        mparams.DiscardOldest = True
233 1
        if mfilter:
234 1
            mparams.Filter = mfilter
235 1
        mir = ua.MonitoredItemCreateRequest()
236 1
        mir.ItemToMonitor = rv
237 1
        mir.MonitoringMode = ua.MonitoringMode.Reporting
238 1
        mir.RequestedParameters = mparams
239 1
        return mir
240
241 1
    def create_monitored_items(self, monitored_items):
242
        """
243
        low level method to have full control over subscription parameters
244
        Client handle must be unique since it will be used as key for internal registration of data
245
        """
246 1
        params = ua.CreateMonitoredItemsParameters()
247 1
        params.SubscriptionId = self.subscription_id
248 1
        params.ItemsToCreate = monitored_items
249 1
        params.TimestampsToReturn = ua.TimestampsToReturn.Neither
250
        
251 1
        mids = []
252 1
        results = self.server.create_monitored_items(params)
253
        # FIXME: Race condition here
254
        # We lock as early as possible. But in some situation, a notification may arrives before
255
        # locking and we will not be able to prosess it. To avoid issues, users should subscribe 
256
        # to all nodes at once
257 1
        with self._lock:  
258 1
            for idx, result in enumerate(results):
259 1
                mi = params.ItemsToCreate[idx]
260 1
                if not result.StatusCode.is_good():
261 1
                    mids.append(result.StatusCode)
262 1
                    continue
263 1
                data = SubscriptionItemData()
264 1
                data.client_handle = mi.RequestedParameters.ClientHandle
265 1
                data.node = Node(self.server, mi.ItemToMonitor.NodeId)
266 1
                data.attribute = mi.ItemToMonitor.AttributeId
267 1
                data.server_handle = result.MonitoredItemId
268
                #data.mfilter = result.FilterResult
269 1
                data.mfilter = mi.RequestedParameters.Filter
270 1
                self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
271
272 1
                mids.append(result.MonitoredItemId)
273 1
        return mids
274
275 1
    def unsubscribe(self, handle):
276
        """
277
        unsubscribe to datachange or events using the handle returned while subscribing
278
        if you delete subscription, you do not need to unsubscribe
279
        """
280 1
        params = ua.DeleteMonitoredItemsParameters()
281 1
        params.SubscriptionId = self.subscription_id
282 1
        params.MonitoredItemIds = [handle]
283 1
        results = self.server.delete_monitored_items(params)
284 1
        results[0].check()
285 1
        with self._lock:
286 1
            for k, v in self._monitoreditems_map.items():
287 1
                if v.server_handle == handle:
288 1
                    del(self._monitoreditems_map[k])
289 1
                    return
290
291