Completed
Push — master ( 558fdd...a80a7b )
by Olivier
02:58
created

opcua.UASocketClient.close_secure_channel()   A

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2
Metric Value
dl 0
loc 13
ccs 7
cts 7
cp 1
rs 9.4286
cc 2
crap 2
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13
14 1
class CachedRequest(object):
15 1
    def __init__(self, binary):
16
        self.binary = binary
17
18 1
    def to_binary(self):
19
        return self.binary
20
21
22 1
class UASocketClient(object):
23
    """
24
    handle socket connection and send ua messages
25
    timeout is the timeout used while waiting for an ua answer from server
26
    """
27 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
28 1
        self.logger = logging.getLogger(__name__ + "Socket")
29 1
        self._thread = None
30 1
        self._lock = Lock()
31 1
        self.timeout = timeout
32 1
        self._socket = None
33 1
        self._do_stop = False
34 1
        self._security_token = ua.ChannelSecurityToken()
35 1
        self.authentication_token = ua.NodeId()
36 1
        self._sequence_number = 0
37 1
        self._request_id = 0
38 1
        self._request_handle = 0
39 1
        self._callbackmap = {}
40 1
        self._security_policy = security_policy
41 1
        self._max_chunk_size = 65536
42
43 1
    def start(self):
44
        """
45
        Start receiving thread.
46
        this is called automatically in connect and
47
        should not be necessary to call directly
48
        """
49 1
        self._thread = Thread(target=self._run)
50 1
        self._thread.start()
51
52 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
53
        """
54
        send request to server, lower-level method
55
        timeout is the timeout written in ua header
56
        returns future
57
        """
58 1
        with self._lock:
59 1
            request.RequestHeader = self._create_request_header(timeout)
60 1
            try:
61 1
                binreq = request.to_binary()
62
            except:
63
                # reset reqeust handle if any error
64
                # see self._create_request_header
65
                self._request_handle -= 1
66
                raise
67 1
            self._request_id += 1
68 1
            future = Future()
69 1
            if callback:
70 1
                future.add_done_callback(callback)
71 1
            self._callbackmap[self._request_id] = future
72 1
            for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, binreq, self._max_chunk_size,
73
                    message_type=message_type,
74
                    channel_id=self._security_token.ChannelId,
75
                    request_id=self._request_id,
76
                    token_id=self._security_token.TokenId):
77 1
                self._sequence_number += 1
78 1
                chunk.SequenceHeader.SequenceNumber = self._sequence_number
79 1
                self._socket.write(chunk.to_binary())
80 1
        return future
81
82 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
83
        """
84
        send request to server.
85
        timeout is the timeout written in ua header
86
        returns response object if no callback is provided
87
        """
88 1
        future = self._send_request(request, callback, timeout, message_type)
89 1
        if not callback:
90 1
            data = future.result(self.timeout)
91 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
92 1
            return data
93
94 1
    def check_answer(self, data, context):
95 1
        data = data.copy(50)  # FIXME check max length nodeid + responseheader
96 1
        typeid = ua.NodeId.from_binary(data)
97 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
98
            self.logger.warning("ServiceFault from server received %s", context)
99
            hdr = ua.ResponseHeader.from_binary(data)
100
            hdr.ServiceResult.check()
101
            return False
102 1
        return True
103
104 1
    def _run(self):
105 1
        self.logger.info("Thread started")
106 1
        while not self._do_stop:
107 1
            try:
108 1
                self._receive()
109 1
            except ua.utils.SocketClosedException:
110 1
                self.logger.info("Socket has closed connection")
111 1
                break
112 1
        self.logger.info("Thread ended")
113
114 1
    def _receive_header(self):
115 1
        self.logger.debug("Waiting for header")
116 1
        header = ua.Header.from_string(self._socket)
117 1
        self.logger.info("received header: %s", header)
118 1
        return header
119
120 1
    def _receive_body(self, size):
121 1
        self.logger.debug("reading body of message (%s bytes)", size)
122 1
        data = self._socket.read(size)
123 1
        if size != len(data):
124
            raise Exception("Error, did not receive expected number of bytes, got {}, asked for {}".format(len(data), size))
125 1
        return utils.Buffer(data)
126
127 1
    def _receive(self):
128 1
        body_chunk = b""
129 1
        while True:
130 1
            ret = self._receive_complete_msg()
131 1
            if ret is None:
132 1
                return
133 1
            hdr, algohdr, seqhdr, body = ret 
134 1
            if hdr.ChunkType in (b"F", b"A"):
135 1
                body.data = body_chunk + body.data
136 1
                break
137
            elif hdr.ChunkType == b"C":
138
                self.logger.debug("Received an intermediate message with header %s, waiting for next message", hdr)
139
                body_chunk += body.data
140
            else:
141
                self.logger.warning("Received a message with unknown ChunkType %s, in header %s", hdr.ChunkType, hdr)
142
                return
143 1
        self._call_callback(seqhdr.RequestId, body)
144
145 1
    def _receive_complete_msg(self):
146 1
        header = self._receive_header()
147 1
        if header is None:
148
            return None
149 1
        body = self._receive_body(header.body_size)
150 1
        if header.MessageType == ua.MessageType.Error:
151
            self.logger.warning("Received an error message type")
152
            err = ua.ErrorMessage.from_binary(body)
153
            self.logger.warning(err)
154
            return None
155 1
        if header.MessageType == ua.MessageType.Acknowledge:
156 1
            self._call_callback(0, body)
157 1
            return None
158 1
        elif header.MessageType == ua.MessageType.SecureOpen:
159 1
            algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
160 1
            self.logger.info(algohdr)
161 1
        elif header.MessageType == ua.MessageType.SecureMessage:
162 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
163 1
            self.logger.info(algohdr)
164
        else:
165
            self.logger.warning("Unsupported message type: %s", header.MessageType)
166
            return None
167 1
        seqhdr = ua.SequenceHeader.from_binary(body)
168 1
        self.logger.debug(seqhdr)
169 1
        return header, algohdr, seqhdr, body
170
171 1
    def _call_callback(self, request_id, body):
172 1
        with self._lock:
173 1
            future = self._callbackmap.pop(request_id, None)
174 1
            if future is None:
175 1
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
176 1
        future.set_result(body)
177
178 1
    def _write_socket(self, hdr, *args):
179 1
        alle = []
180 1
        for arg in args:
181 1
            data = arg.to_binary()
182 1
            hdr.add_size(len(data))
183 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
184 1
            self.logger.debug("struct: %s", arg)
185 1
            self.logger.debug("data: %s", data)
186 1
            alle.append(data)
187 1
        alle.insert(0, hdr.to_binary())
188 1
        alle = b"".join(alle)
189 1
        self._socket.write(alle)
190
191 1
    def _create_request_header(self, timeout=1000):
192 1
        hdr = ua.RequestHeader()
193 1
        hdr.AuthenticationToken = self.authentication_token
194 1
        self._request_handle += 1
195 1
        hdr.RequestHandle = self._request_handle
196 1
        hdr.TimeoutHint = timeout
197 1
        return hdr
198
199 1
    def connect_socket(self, host, port):
200
        """
201
        connect to server socket and start receiving thread
202
        """
203 1
        self.logger.info("opening connection")
204 1
        sock = socket.create_connection((host, port))
205 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
206 1
        self._socket = utils.SocketWrapper(sock)
207 1
        self.start()
208
209 1
    def disconnect_socket(self):
210 1
        self.logger.info("stop request")
211 1
        self._do_stop = True
212 1
        self._socket.socket.shutdown(socket.SHUT_WR)
213
214 1
    def send_hello(self, url):
215 1
        hello = ua.Hello()
216 1
        hello.EndpointUrl = url
217 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
218 1
        future = Future()
219 1
        with self._lock:
220 1
            self._callbackmap[0] = future
221 1
        self._write_socket(header, hello)
222 1
        ack = ua.Acknowledge.from_binary(future.result(self.timeout))
223 1
        self._max_chunk_size = ack.SendBufferSize	# client shouldn't send chunks larger than this
224 1
        return ack
225
226 1
    def open_secure_channel(self, params):
227 1
        self.logger.info("open_secure_channel")
228 1
        request = ua.OpenSecureChannelRequest()
229 1
        request.Parameters = params
230 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
231
232 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
233 1
        response.ResponseHeader.ServiceResult.check()
234 1
        self._security_token = response.Parameters.SecurityToken
235 1
        return response.Parameters
236
237 1
    def close_secure_channel(self):
238
        """
239
        close secure channel. It seems to trigger a shutdown of socket
240
        in most servers, so be prepare to reconnect.
241
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
242
        """
243 1
        self.logger.info("close_secure_channel")
244 1
        request = ua.CloseSecureChannelRequest()
245 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
246 1
        with self._lock:
247
            # don't expect any more answers
248 1
            future.cancel()
249 1
            self._callbackmap.clear()
250
251
        # some servers send a response here, most do not ... so we ignore
252
253
254
255 1
class BinaryClient(object):
256
257
    """
258
    low level OPC-UA client.
259
    implement all(well..one day) methods defined in opcua spec
260
    taking in argument the structures defined in opcua spec
261
    in python most of the structures are defined in
262
    uaprotocol_auto.py and uaprotocol_hand.py
263
    """
264
265 1
    def __init__(self, timeout=1):
266 1
        self.logger = logging.getLogger(__name__)
267 1
        self._publishcallbacks = {}
268 1
        self._lock = Lock()
269 1
        self._timeout = timeout
270 1
        self._uasocket = None
271
272 1
    def connect_socket(self, host, port):
273
        """
274
        connect to server socket and start receiving thread
275
        """
276 1
        self._uasocket = UASocketClient(self._timeout)
277 1
        return self._uasocket.connect_socket(host, port)
278
279 1
    def disconnect_socket(self):
280 1
        return self._uasocket.disconnect_socket()
281
282 1
    def send_hello(self, url):
283 1
        return self._uasocket.send_hello(url)
284
285 1
    def open_secure_channel(self, params):
286 1
        return self._uasocket.open_secure_channel(params)
287
288 1
    def close_secure_channel(self):
289
        """
290
        close secure channel. It seems to trigger a shutdown of socket
291
        in most servers, so be prepare to reconnect
292
        """
293 1
        return self._uasocket.close_secure_channel()
294
295 1
    def create_session(self, parameters):
296 1
        self.logger.info("create_session")
297 1
        request = ua.CreateSessionRequest()
298 1
        request.Parameters = parameters
299 1
        data = self._uasocket.send_request(request)
300 1
        response = ua.CreateSessionResponse.from_binary(data)
301 1
        response.ResponseHeader.ServiceResult.check()
302 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
303 1
        return response.Parameters
304
305 1
    def activate_session(self, parameters):
306 1
        self.logger.info("activate_session")
307 1
        request = ua.ActivateSessionRequest()
308 1
        request.Parameters = parameters
309 1
        data = self._uasocket.send_request(request)
310 1
        response = ua.ActivateSessionResponse.from_binary(data)
311 1
        response.ResponseHeader.ServiceResult.check()
312 1
        return response.Parameters
313
314 1
    def close_session(self, deletesubscriptions):
315 1
        self.logger.info("close_session")
316 1
        request = ua.CloseSessionRequest()
317 1
        request.DeleteSubscriptions = deletesubscriptions
318 1
        data = self._uasocket.send_request(request)
319 1
        ua.CloseSessionResponse.from_binary(data)
320
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
321
322 1
    def browse(self, parameters):
323 1
        self.logger.info("browse")
324 1
        request = ua.BrowseRequest()
325 1
        request.Parameters = parameters
326 1
        data = self._uasocket.send_request(request)
327 1
        response = ua.BrowseResponse.from_binary(data)
328 1
        response.ResponseHeader.ServiceResult.check()
329 1
        return response.Results
330
331 1
    def read(self, parameters):
332 1
        self.logger.info("read")
333 1
        request = ua.ReadRequest()
334 1
        request.Parameters = parameters
335 1
        data = self._uasocket.send_request(request)
336 1
        response = ua.ReadResponse.from_binary(data)
337 1
        response.ResponseHeader.ServiceResult.check()
338 1
        return response.Results
339
340 1
    def write(self, params):
341 1
        self.logger.info("read")
342 1
        request = ua.WriteRequest()
343 1
        request.Parameters = params
344 1
        data = self._uasocket.send_request(request)
345 1
        response = ua.WriteResponse.from_binary(data)
346 1
        response.ResponseHeader.ServiceResult.check()
347 1
        return response.Results
348
349 1
    def get_endpoints(self, params):
350 1
        self.logger.info("get_endpoint")
351 1
        request = ua.GetEndpointsRequest()
352 1
        request.Parameters = params
353 1
        data = self._uasocket.send_request(request)
354 1
        response = ua.GetEndpointsResponse.from_binary(data)
355 1
        response.ResponseHeader.ServiceResult.check()
356 1
        return response.Endpoints
357
358 1
    def find_servers(self, params):
359 1
        self.logger.info("find_servers")
360 1
        request = ua.FindServersRequest()
361 1
        request.Parameters = params
362 1
        data = self._uasocket.send_request(request)
363 1
        response = ua.FindServersResponse.from_binary(data)
364 1
        response.ResponseHeader.ServiceResult.check()
365 1
        return response.Servers
366
367 1
    def find_servers_on_network(self, params):
368
        self.logger.info("find_servers_on_network")
369
        request = ua.FindServersOnNetworkRequest()
370
        request.Parameters = params
371
        data = self._uasocket.send_request(request)
372
        response = ua.FindServersOnNetworkResponse.from_binary(data)
373
        response.ResponseHeader.ServiceResult.check()
374
        return response.Parameters
375
376 1
    def register_server(self, registered_server):
377 1
        self.logger.info("register_server")
378 1
        request = ua.RegisterServerRequest()
379 1
        request.Server = registered_server
380 1
        data = self._uasocket.send_request(request)
381 1
        response = ua.RegisterServerResponse.from_binary(data)
382 1
        response.ResponseHeader.ServiceResult.check()
383
        # nothing to return for this service
384
385 1
    def register_server2(self, params):
386
        self.logger.info("register_server2")
387
        request = ua.RegisterServer2Request()
388
        request.Parameters = params
389
        data = self._uasocket.send_request(request)
390
        response = ua.RegisterServer2Response.from_binary(data)
391
        response.ResponseHeader.ServiceResult.check()
392
        return response.ConfigurationResults
393
394 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
395 1
        self.logger.info("translate_browsepath_to_nodeid")
396 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
397 1
        request.Parameters.BrowsePaths = browsepaths
398 1
        data = self._uasocket.send_request(request)
399 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
400 1
        response.ResponseHeader.ServiceResult.check()
401 1
        return response.Results
402
403 1
    def create_subscription(self, params, callback):
404 1
        self.logger.info("create_subscription")
405 1
        request = ua.CreateSubscriptionRequest()
406 1
        request.Parameters = params
407 1
        data = self._uasocket.send_request(request)
408 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
409 1
        response.ResponseHeader.ServiceResult.check()
410 1
        with self._lock:
411 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
412 1
        return response.Parameters
413
414 1
    def delete_subscriptions(self, subscriptionids):
415 1
        self.logger.info("delete_subscription")
416 1
        request = ua.DeleteSubscriptionsRequest()
417 1
        request.Parameters.SubscriptionIds = subscriptionids
418 1
        data = self._uasocket.send_request(request)
419 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
420 1
        response.ResponseHeader.ServiceResult.check()
421 1
        for sid in subscriptionids:
422 1
            with self._lock:
423 1
                self._publishcallbacks.pop(sid)
424 1
        return response.Results
425
426 1
    def publish(self, acks=None):
427 1
        self.logger.info("publish")
428 1
        if acks is None:
429 1
            acks = []
430 1
        request = ua.PublishRequest()
431 1
        request.Parameters.SubscriptionAcknowledgements = acks
432 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
433
434 1
    def _call_publish_callback(self, future):
435 1
        self.logger.info("call_publish_callback")
436 1
        data = future.result()
437 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
438 1
        response = ua.PublishResponse.from_binary(data)
439 1
        with self._lock:
440 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
441 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
442 1
                return
443 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
444 1
        try:
445 1
            callback(response.Parameters)
446
        except Exception:  # we call client code, catch everything!
447
            self.logger.exception("Exception while calling user callback: %s")
448
449 1
    def create_monitored_items(self, params):
450 1
        self.logger.info("create_monitored_items")
451 1
        request = ua.CreateMonitoredItemsRequest()
452 1
        request.Parameters = params
453 1
        data = self._uasocket.send_request(request)
454 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
455 1
        response.ResponseHeader.ServiceResult.check()
456 1
        return response.Results
457
458 1
    def delete_monitored_items(self, params):
459 1
        self.logger.info("delete_monitored_items")
460 1
        request = ua.DeleteMonitoredItemsRequest()
461 1
        request.Parameters = params
462 1
        data = self._uasocket.send_request(request)
463 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
464 1
        response.ResponseHeader.ServiceResult.check()
465 1
        return response.Results
466
467 1
    def add_nodes(self, nodestoadd):
468 1
        self.logger.info("add_nodes")
469 1
        request = ua.AddNodesRequest()
470 1
        request.Parameters.NodesToAdd = nodestoadd
471 1
        data = self._uasocket.send_request(request)
472 1
        response = ua.AddNodesResponse.from_binary(data)
473 1
        response.ResponseHeader.ServiceResult.check()
474 1
        return response.Results
475
476 1
    def call(self, methodstocall):
477 1
        request = ua.CallRequest()
478 1
        request.Parameters.MethodsToCall = methodstocall
479 1
        data = self._uasocket.send_request(request)
480 1
        response = ua.CallResponse.from_binary(data)
481 1
        response.ResponseHeader.ServiceResult.check()
482 1
        return response.Results
483
484 1
    def history_read(self, params):
485
        self.logger.info("history_read")
486
        request = ua.HistoryReadRequest()
487
        request.Parameters = params
488
        data = self._uasocket.send_request(request)
489
        response = ua.HistoryReadResponse.from_binary(data)
490
        response.ResponseHeader.ServiceResult.check()
491
        return response.Results
492