Passed
Push — dev ( 3fa5c0 )
by Olivier
03:18
created

translate_browsepaths_to_nodeids()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1
Metric Value
cc 1
dl 0
loc 8
ccs 8
cts 8
cp 1
crap 1
rs 9.4285
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14
15 1
class UASocketClient(object):
16
    """
17
    handle socket connection and send ua messages
18
    timeout is the timeout used while waiting for an ua answer from server
19
    """
20 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
21 1
        self.logger = logging.getLogger(__name__ + "Socket")
22 1
        self._thread = None
23 1
        self._lock = Lock()
24 1
        self.timeout = timeout
25 1
        self._socket = None
26 1
        self._do_stop = False
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._connection = ua.SecureConnection(security_policy)
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.start()
41
42 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
43
        """
44
        send request to server, lower-level method
45
        timeout is the timeout written in ua header
46
        returns future
47
        """
48 1
        with self._lock:
49 1
            request.RequestHeader = self._create_request_header(timeout)
50 1
            try:
51 1
                binreq = request.to_binary()
52
            except:
53
                # reset reqeust handle if any error
54
                # see self._create_request_header
55
                self._request_handle -= 1
56
                raise
57 1
            self._request_id += 1
58 1
            future = Future()
59 1
            if callback:
60 1
                future.add_done_callback(callback)
61 1
            self._callbackmap[self._request_id] = future
62 1
            msg = self._connection.message_to_binary(binreq, message_type, self._request_id)
63 1
            self._socket.write(msg)
64 1
        return future
65
66 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
67
        """
68
        send request to server.
69
        timeout is the timeout written in ua header
70
        returns response object if no callback is provided
71
        """
72 1
        future = self._send_request(request, callback, timeout, message_type)
73 1
        if not callback:
74 1
            data = future.result(self.timeout)
75 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
76 1
            return data
77
78 1
    def check_answer(self, data, context):
79 1
        data = data.copy()
80 1
        typeid = ua.NodeId.from_binary(data)
81 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
82 1
            self.logger.warning("ServiceFault from server received %s", context)
83 1
            hdr = ua.ResponseHeader.from_binary(data)
84 1
            hdr.ServiceResult.check()
85
            return False
86 1
        return True
87
88 1
    def _run(self):
89 1
        self.logger.info("Thread started")
90 1
        while not self._do_stop:
91 1
            try:
92 1
                self._receive()
93 1
            except ua.utils.SocketClosedException:
94 1
                self.logger.info("Socket has closed connection")
95 1
                break
96 1
        self.logger.info("Thread ended")
97
98 1
    def _receive(self):
99 1
        msg = self._connection.receive_from_socket(self._socket)
100 1
        if msg is None:
101
            return
102 1
        elif isinstance(msg, ua.Message):
103 1
            self._call_callback(msg.request_id(), msg.body())
104 1
        elif isinstance(msg, ua.Acknowledge):
105 1
            self._call_callback(0, msg)
106
        elif isinstance(msg, ua.ErrorMessage):
107
            self.logger.warning("Received an error: {}".format(msg))
108
        else:
109
            raise ua.UaError("Unsupported message type: {}".format(msg))
110
111 1
    def _call_callback(self, request_id, body):
112 1
        with self._lock:
113 1
            future = self._callbackmap.pop(request_id, None)
114 1
            if future is None:
115
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
116 1
        future.set_result(body)
117
118 1
    def _create_request_header(self, timeout=1000):
119 1
        hdr = ua.RequestHeader()
120 1
        hdr.AuthenticationToken = self.authentication_token
121 1
        self._request_handle += 1
122 1
        hdr.RequestHandle = self._request_handle
123 1
        hdr.TimeoutHint = timeout
124 1
        return hdr
125
126 1
    def connect_socket(self, host, port):
127
        """
128
        connect to server socket and start receiving thread
129
        """
130 1
        self.logger.info("opening connection")
131 1
        sock = socket.create_connection((host, port))
132 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
133 1
        self._socket = utils.SocketWrapper(sock)
134 1
        self.start()
135
136 1
    def disconnect_socket(self):
137 1
        self.logger.info("stop request")
138 1
        self._do_stop = True
139 1
        self._socket.socket.shutdown(socket.SHUT_WR)
140 1
        self._socket.socket.close()
141
142 1
    def send_hello(self, url):
143 1
        hello = ua.Hello()
144 1
        hello.EndpointUrl = url
145 1
        future = Future()
146 1
        with self._lock:
147 1
            self._callbackmap[0] = future
148 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
149 1
        self._socket.write(binmsg)
150 1
        ack = future.result(self.timeout)
151 1
        return ack
152
153 1
    def open_secure_channel(self, params):
154 1
        self.logger.info("open_secure_channel")
155 1
        request = ua.OpenSecureChannelRequest()
156 1
        request.Parameters = params
157 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
158
159 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
160 1
        response.ResponseHeader.ServiceResult.check()
161 1
        self._connection.set_security_token(response.Parameters.SecurityToken)
162 1
        return response.Parameters
163
164 1
    def close_secure_channel(self):
165
        """
166
        close secure channel. It seems to trigger a shutdown of socket
167
        in most servers, so be prepare to reconnect.
168
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
169
        """
170 1
        self.logger.info("close_secure_channel")
171 1
        request = ua.CloseSecureChannelRequest()
172 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
173 1
        with self._lock:
174
            # don't expect any more answers
175 1
            future.cancel()
176 1
            self._callbackmap.clear()
177
178
        # some servers send a response here, most do not ... so we ignore
179
180
181 1
class UaClient(object):
182
183
    """
184
    low level OPC-UA client.
185
    implement all(well..one day) methods defined in opcua spec
186
    taking in argument the structures defined in opcua spec
187
    in python most of the structures are defined in
188
    uaprotocol_auto.py and uaprotocol_hand.py
189
    """
190
191 1
    def __init__(self, timeout=1):
192 1
        self.logger = logging.getLogger(__name__)
193
        # _publishcallbacks should be accessed in recv thread only
194 1
        self._publishcallbacks = {}
195 1
        self._timeout = timeout
196 1
        self._uasocket = None
197 1
        self._security_policy = ua.SecurityPolicy()
198
199 1
    def set_security(self, policy):
200
        self._security_policy = policy
201
202 1
    def connect_socket(self, host, port):
203
        """
204
        connect to server socket and start receiving thread
205
        """
206 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
207 1
        return self._uasocket.connect_socket(host, port)
208
209 1
    def disconnect_socket(self):
210 1
        return self._uasocket.disconnect_socket()
211
212 1
    def send_hello(self, url):
213 1
        return self._uasocket.send_hello(url)
214
215 1
    def open_secure_channel(self, params):
216 1
        return self._uasocket.open_secure_channel(params)
217
218 1
    def close_secure_channel(self):
219
        """
220
        close secure channel. It seems to trigger a shutdown of socket
221
        in most servers, so be prepare to reconnect
222
        """
223 1
        return self._uasocket.close_secure_channel()
224
225 1
    def create_session(self, parameters):
226 1
        self.logger.info("create_session")
227 1
        request = ua.CreateSessionRequest()
228 1
        request.Parameters = parameters
229 1
        data = self._uasocket.send_request(request)
230 1
        response = ua.CreateSessionResponse.from_binary(data)
231 1
        response.ResponseHeader.ServiceResult.check()
232 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
233 1
        return response.Parameters
234
235 1
    def activate_session(self, parameters):
236 1
        self.logger.info("activate_session")
237 1
        request = ua.ActivateSessionRequest()
238 1
        request.Parameters = parameters
239 1
        data = self._uasocket.send_request(request)
240 1
        response = ua.ActivateSessionResponse.from_binary(data)
241 1
        response.ResponseHeader.ServiceResult.check()
242 1
        return response.Parameters
243
244 1
    def close_session(self, deletesubscriptions):
245 1
        self.logger.info("close_session")
246 1
        request = ua.CloseSessionRequest()
247 1
        request.DeleteSubscriptions = deletesubscriptions
248 1
        data = self._uasocket.send_request(request)
249 1
        ua.CloseSessionResponse.from_binary(data)
250
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
251
252 1
    def browse(self, parameters):
253 1
        self.logger.info("browse")
254 1
        request = ua.BrowseRequest()
255 1
        request.Parameters = parameters
256 1
        data = self._uasocket.send_request(request)
257 1
        response = ua.BrowseResponse.from_binary(data)
258 1
        response.ResponseHeader.ServiceResult.check()
259 1
        return response.Results
260
261 1
    def read(self, parameters):
262 1
        self.logger.info("read")
263 1
        request = ua.ReadRequest()
264 1
        request.Parameters = parameters
265 1
        data = self._uasocket.send_request(request)
266 1
        response = ua.ReadResponse.from_binary(data)
267 1
        response.ResponseHeader.ServiceResult.check()
268
        # cast to Enum attributes that need to
269 1
        for idx, rv in enumerate(parameters.NodesToRead):
270 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
271
                dv = response.Results[idx]
272
                if dv.StatusCode.is_good():
273
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
274 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
275
                dv = response.Results[idx]
276
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
277
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
278 1
        return response.Results
279
280 1
    def write(self, params):
281 1
        self.logger.info("read")
282 1
        request = ua.WriteRequest()
283 1
        request.Parameters = params
284 1
        data = self._uasocket.send_request(request)
285 1
        response = ua.WriteResponse.from_binary(data)
286 1
        response.ResponseHeader.ServiceResult.check()
287 1
        return response.Results
288
289 1
    def get_endpoints(self, params):
290 1
        self.logger.info("get_endpoint")
291 1
        request = ua.GetEndpointsRequest()
292 1
        request.Parameters = params
293 1
        data = self._uasocket.send_request(request)
294 1
        response = ua.GetEndpointsResponse.from_binary(data)
295 1
        response.ResponseHeader.ServiceResult.check()
296 1
        return response.Endpoints
297
298 1
    def find_servers(self, params):
299 1
        self.logger.info("find_servers")
300 1
        request = ua.FindServersRequest()
301 1
        request.Parameters = params
302 1
        data = self._uasocket.send_request(request)
303 1
        response = ua.FindServersResponse.from_binary(data)
304 1
        response.ResponseHeader.ServiceResult.check()
305 1
        return response.Servers
306
307 1
    def find_servers_on_network(self, params):
308
        self.logger.info("find_servers_on_network")
309
        request = ua.FindServersOnNetworkRequest()
310
        request.Parameters = params
311
        data = self._uasocket.send_request(request)
312
        response = ua.FindServersOnNetworkResponse.from_binary(data)
313
        response.ResponseHeader.ServiceResult.check()
314
        return response.Parameters
315
316 1
    def register_server(self, registered_server):
317 1
        self.logger.info("register_server")
318 1
        request = ua.RegisterServerRequest()
319 1
        request.Server = registered_server
320 1
        data = self._uasocket.send_request(request)
321 1
        response = ua.RegisterServerResponse.from_binary(data)
322 1
        response.ResponseHeader.ServiceResult.check()
323
        # nothing to return for this service
324
325 1
    def register_server2(self, params):
326
        self.logger.info("register_server2")
327
        request = ua.RegisterServer2Request()
328
        request.Parameters = params
329
        data = self._uasocket.send_request(request)
330
        response = ua.RegisterServer2Response.from_binary(data)
331
        response.ResponseHeader.ServiceResult.check()
332
        return response.ConfigurationResults
333
334 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
335 1
        self.logger.info("translate_browsepath_to_nodeid")
336 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
337 1
        request.Parameters.BrowsePaths = browsepaths
338 1
        data = self._uasocket.send_request(request)
339 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
340 1
        response.ResponseHeader.ServiceResult.check()
341 1
        return response.Results
342
343 1
    def create_subscription(self, params, callback):
344 1
        self.logger.info("create_subscription")
345 1
        request = ua.CreateSubscriptionRequest()
346 1
        request.Parameters = params
347 1
        resp_fut = Future()
348 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
349 1
        self._uasocket.send_request(request, mycallbak)
350 1
        return resp_fut.result(self._timeout)
351
352 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
353 1
        self.logger.info("_create_subscription_callback")
354 1
        data = data_fut.result()
355 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
356 1
        response.ResponseHeader.ServiceResult.check()
357 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
358 1
        resp_fut.set_result(response.Parameters)
359
360 1
    def delete_subscriptions(self, subscriptionids):
361 1
        self.logger.info("delete_subscription")
362 1
        request = ua.DeleteSubscriptionsRequest()
363 1
        request.Parameters.SubscriptionIds = subscriptionids
364 1
        resp_fut = Future()
365 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
366 1
        self._uasocket.send_request(request, mycallbak)
367 1
        return resp_fut.result(self._timeout)
368
369 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
370 1
        self.logger.info("_delete_subscriptions_callback")
371 1
        data = data_fut.result()
372 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
373 1
        response.ResponseHeader.ServiceResult.check()
374 1
        for sid in subscriptionids:
375 1
            self._publishcallbacks.pop(sid)
376 1
        resp_fut.set_result(response.Results)
377
378 1
    def publish(self, acks=None):
379 1
        self.logger.info("publish")
380 1
        if acks is None:
381 1
            acks = []
382 1
        request = ua.PublishRequest()
383 1
        request.Parameters.SubscriptionAcknowledgements = acks
384 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
385
386 1
    def _call_publish_callback(self, future):
387 1
        self.logger.info("call_publish_callback")
388 1
        data = future.result()
389 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
390 1
        try:
391 1
            response = ua.PublishResponse.from_binary(data)
392
        except Exception:
393
            self.logger.exception("Error parsing notificatipn from server")
394
            self.publish([]) #send publish request ot server so he does stop sending notifications
395
            return
396 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
397
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
398
            return
399 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
400 1
        try:
401 1
            callback(response.Parameters)
402
        except Exception:  # we call client code, catch everything!
403
            self.logger.exception("Exception while calling user callback: %s")
404
405 1
    def create_monitored_items(self, params):
406 1
        self.logger.info("create_monitored_items")
407 1
        request = ua.CreateMonitoredItemsRequest()
408 1
        request.Parameters = params
409 1
        data = self._uasocket.send_request(request)
410 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
411 1
        response.ResponseHeader.ServiceResult.check()
412 1
        return response.Results
413
414 1
    def delete_monitored_items(self, params):
415 1
        self.logger.info("delete_monitored_items")
416 1
        request = ua.DeleteMonitoredItemsRequest()
417 1
        request.Parameters = params
418 1
        data = self._uasocket.send_request(request)
419 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
420 1
        response.ResponseHeader.ServiceResult.check()
421 1
        return response.Results
422
423 1
    def add_nodes(self, nodestoadd):
424 1
        self.logger.info("add_nodes")
425 1
        request = ua.AddNodesRequest()
426 1
        request.Parameters.NodesToAdd = nodestoadd
427 1
        data = self._uasocket.send_request(request)
428 1
        response = ua.AddNodesResponse.from_binary(data)
429 1
        response.ResponseHeader.ServiceResult.check()
430 1
        return response.Results
431
432 1
    def delete_nodes(self, nodestodelete):
433
        self.logger.info("delete_nodes")
434
        request = ua.DeleteNodesRequest()
435
        request.Parameters.NodesToDelete = nodestodelete
436
        data = self._uasocket.send_request(request)
437
        response = ua.DeleteNodesResponse.from_binary(data)
438
        response.ResponseHeader.ServiceResult.check()
439
        return response.Results
440
441 1
    def call(self, methodstocall):
442 1
        request = ua.CallRequest()
443 1
        request.Parameters.MethodsToCall = methodstocall
444 1
        data = self._uasocket.send_request(request)
445 1
        response = ua.CallResponse.from_binary(data)
446 1
        response.ResponseHeader.ServiceResult.check()
447 1
        return response.Results
448
449 1
    def history_read(self, params):
450
        self.logger.info("history_read")
451
        request = ua.HistoryReadRequest()
452
        request.Parameters = params
453
        data = self._uasocket.send_request(request)
454
        response = ua.HistoryReadResponse.from_binary(data)
455
        response.ResponseHeader.ServiceResult.check()
456
        return response.Results
457