Completed
Pull Request — master (#70)
by Olivier
03:03
created

opcua.BinaryClient   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 237
Duplicated Lines 0 %

Test Coverage

Coverage 88.08%
Metric Value
wmc 34
dl 0
loc 237
ccs 170
cts 193
cp 0.8808
rs 9.2

27 Methods

Rating   Name   Duplication   Size   Complexity  
A close_secure_channel() 0 6 1
A delete_subscriptions() 0 11 3
A translate_browsepaths_to_nodeids() 0 8 1
A add_nodes() 0 8 1
A find_servers() 0 8 1
A send_hello() 0 2 1
A create_subscription() 0 10 2
A get_endpoints() 0 8 1
A register_server2() 0 8 1
A browse() 0 8 1
A publish() 0 7 2
A __init__() 0 6 1
A close_session() 0 6 1
A register_server() 0 7 1
A connect_socket() 0 6 1
A open_secure_channel() 0 2 1
A activate_session() 0 8 1
A create_monitored_items() 0 8 1
A write() 0 8 1
A _call_publish_callback() 0 14 4
A history_read() 0 8 1
A delete_monitored_items() 0 8 1
A read() 0 8 1
A find_servers_on_network() 0 8 1
A create_session() 0 9 1
A disconnect_socket() 0 2 1
A call() 0 7 1
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13
14 1
class UASocketClient(object):
15
    """
16
    handle socket connection and send ua messages
17
    timeout is the timeout used while waiting for an ua answer from server
18
    """
19 1
    def __init__(self, timeout=1):
20 1
        self.logger = logging.getLogger(__name__ + "Socket")
21 1
        self._thread = None
22 1
        self._lock = Lock()
23 1
        self.timeout = timeout
24 1
        self._socket = None
25 1
        self._do_stop = False
26 1
        self._security_token = ua.ChannelSecurityToken()
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._sequence_number = 0
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.start()
41
42 1
    def send_request(self, request, callback=None, timeout=1000):
43
        """
44
        send request to server.
45
        timeout is the timeout written in ua header
46
        """
47
        # HACK to make sure we can convert our request to binary before increasing request counter etc ...
48 1
        request.to_binary()
49
        # END HACK
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            hdr = ua.Header(ua.MessageType.SecureMessage, ua.ChunkType.Single, self._security_token.ChannelId)
53 1
            symhdr = self._create_sym_algo_header()
54 1
            seqhdr = self._create_sequence_header()
55 1
            future = Future()
56 1
            if callback:
57 1
                future.add_done_callback(callback)
58 1
            self._callbackmap[seqhdr.RequestId] = future
59 1
            self._write_socket(hdr, symhdr, seqhdr, request)
60 1
        if not callback:
61 1
            data = future.result(self.timeout)
62 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
63 1
            return data
64
65 1
    def check_answer(self, data, context):
66 1
        data = data.copy(50)  # FIXME check max length nodeid + responseheader
67 1
        typeid = ua.NodeId.from_binary(data)
68 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
69
            self.logger.warning("ServiceFault from server received %s", context)
70
            hdr = ua.ResponseHeader.from_binary(data)
71
            hdr.ServiceResult.check()
72
            return False
73 1
        return True
74
75 1
    def _run(self):
76 1
        self.logger.info("Thread started")
77 1
        while not self._do_stop:
78 1
            try:
79 1
                self._receive()
80 1
            except ua.utils.SocketClosedException:
81 1
                self.logger.info("Socket has closed connection")
82 1
                break
83 1
        self.logger.info("Thread ended")
84
85 1
    def _receive_header(self):
86 1
        self.logger.debug("Waiting for header")
87 1
        header = ua.Header.from_string(self._socket)
88 1
        self.logger.info("received header: %s", header)
89 1
        return header
90
91 1
    def _receive_body(self, size):
92 1
        self.logger.debug("reading body of message (%s bytes)", size)
93 1
        data = self._socket.read(size)
94 1
        if size != len(data):
95
            raise Exception("Error, did not receive expected number of bytes, got {}, asked for {}".format(len(data), size))
96 1
        return utils.Buffer(data)
97
98 1
    def _receive(self):
99 1
        body_chunk = b""
100 1
        while True:
101 1
            ret = self._receive_complete_msg()
102 1
            if ret is None:
103 1
                return
104 1
            hdr, algohdr, seqhdr, body = ret 
105 1
            if hdr.ChunkType in (b"F", b"A"):
106 1
                body.data = body_chunk + body.data
107 1
                break
108
            elif hdr.ChunkType == b"C":
109
                self.logger.debug("Received an intermediate message with header %s, waiting for next message", hdr)
110
                body_chunk += body.data
111
            else:
112
                self.logger.warning("Received a message with unknown ChunkType %s, in header %s", hdr.ChunkType, hdr)
113
                return
114 1
        self._call_callback(seqhdr.RequestId, body)
115
116 1
    def _receive_complete_msg(self):
117 1
        header = self._receive_header()
118 1
        if header is None:
119
            return None
120 1
        body = self._receive_body(header.body_size)
121 1
        if header.MessageType == ua.MessageType.Error:
122
            self.logger.warning("Received an error message type")
123
            err = ua.ErrorMessage.from_binary(body)
124
            self.logger.warning(err)
125
            return None
126 1
        if header.MessageType == ua.MessageType.Acknowledge:
127 1
            self._call_callback(0, body)
128 1
            return None
129 1
        elif header.MessageType == ua.MessageType.SecureOpen:
130 1
            algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
131 1
            self.logger.info(algohdr)
132 1
        elif header.MessageType == ua.MessageType.SecureMessage:
133 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
134 1
            self.logger.info(algohdr)
135
        else:
136
            self.logger.warning("Unsupported message type: %s", header.MessageType)
137
            return None
138 1
        seqhdr = ua.SequenceHeader.from_binary(body)
139 1
        self.logger.debug(seqhdr)
140 1
        return header, algohdr, seqhdr, body
141
142 1
    def _call_callback(self, request_id, body):
143 1
        with self._lock:
144 1
            future = self._callbackmap.pop(request_id, None)
145 1
            if future is None:
146
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
147 1
        future.set_result(body)
148
149 1
    def _write_socket(self, hdr, *args):
150 1
        alle = []
151 1
        for arg in args:
152 1
            data = arg.to_binary()
153 1
            hdr.add_size(len(data))
154 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
155 1
            self.logger.debug("struct: %s", arg)
156 1
            self.logger.debug("data: %s", data)
157 1
            alle.append(data)
158 1
        alle.insert(0, hdr.to_binary())
159 1
        alle = b"".join(alle)
160 1
        self._socket.write(alle)
161
162 1
    def _create_request_header(self, timeout=1000):
163 1
        hdr = ua.RequestHeader()
164 1
        hdr.AuthenticationToken = self.authentication_token
165 1
        self._request_handle += 1
166 1
        hdr.RequestHandle = self._request_handle
167 1
        hdr.TimeoutHint = timeout
168 1
        return hdr
169
170 1
    def _create_sym_algo_header(self):
171 1
        hdr = ua.SymmetricAlgorithmHeader()
172 1
        hdr.TokenId = self._security_token.TokenId
173 1
        return hdr
174
175 1
    def _create_sequence_header(self):
176 1
        hdr = ua.SequenceHeader()
177 1
        self._sequence_number += 1
178 1
        hdr.SequenceNumber = self._sequence_number
179 1
        self._request_id += 1
180 1
        hdr.RequestId = self._request_id
181 1
        return hdr
182
183 1
    def connect_socket(self, host, port):
184
        """
185
        connect to server socket and start receiving thread
186
        """
187 1
        self.logger.info("opening connection")
188 1
        sock = socket.create_connection((host, port))
189 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
190 1
        self._socket = utils.SocketWrapper(sock)
191 1
        self.start()
192
193 1
    def disconnect_socket(self):
194 1
        self.logger.info("stop request")
195 1
        self._do_stop = True
196 1
        self._socket.socket.shutdown(socket.SHUT_WR)
197
198 1
    def send_hello(self, url):
199 1
        hello = ua.Hello()
200 1
        hello.EndpointUrl = url
201 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
202 1
        future = Future()
203 1
        with self._lock:
204 1
            self._callbackmap[0] = future
205 1
        self._write_socket(header, hello)
206 1
        return ua.Acknowledge.from_binary(future.result(self.timeout))
207
208 1
    def open_secure_channel(self, params):
209 1
        self.logger.info("open_secure_channel")
210 1
        with self._lock:
211 1
            request = ua.OpenSecureChannelRequest()
212 1
            request.Parameters = params
213 1
            request.RequestHeader = self._create_request_header()
214
215 1
            hdr = ua.Header(ua.MessageType.SecureOpen, ua.ChunkType.Single, self._security_token.ChannelId)
216 1
            asymhdr = ua.AsymmetricAlgorithmHeader()
217 1
            seqhdr = self._create_sequence_header()
218
219 1
            future = Future()
220 1
            self._callbackmap[seqhdr.RequestId] = future
221 1
            self._write_socket(hdr, asymhdr, seqhdr, request)
222
223 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
224 1
        response.ResponseHeader.ServiceResult.check()
225 1
        self._security_token = response.Parameters.SecurityToken
226 1
        return response.Parameters
227
228 1
    def close_secure_channel(self):
229
        """
230
        close secure channel. It seems to trigger a shutdown of socket
231
        in most servers, so be prepare to reconnect
232
        """
233 1
        self.logger.info("get_endpoint")
234 1
        with self._lock:
235 1
            request = ua.CloseSecureChannelRequest()
236 1
            request.RequestHeader = self._create_request_header()
237
238 1
            hdr = ua.Header(ua.MessageType.SecureClose, ua.ChunkType.Single, self._security_token.ChannelId)
239 1
            symhdr = self._create_sym_algo_header()
240 1
            seqhdr = self._create_sequence_header()
241 1
            self._write_socket(hdr, symhdr, seqhdr, request)
242
243
        # some servers send a response here, most do not ... so we ignore
244
245
246
247 1
class BinaryClient(object):
248
249
    """
250
    low level OPC-UA client.
251
    implement all(well..one day) methods defined in opcua spec
252
    taking in argument the structures defined in opcua spec
253
    in python most of the structures are defined in
254
    uaprotocol_auto.py and uaprotocol_hand.py
255
    """
256
257 1
    def __init__(self, timeout=1):
258 1
        self.logger = logging.getLogger(__name__)
259 1
        self._publishcallbacks = {}
260 1
        self._lock = Lock()
261 1
        self._timeout = timeout
262 1
        self._uasocket = None
263
264 1
    def connect_socket(self, host, port):
265
        """
266
        connect to server socket and start receiving thread
267
        """
268 1
        self._uasocket = UASocketClient(self._timeout)
269 1
        return self._uasocket.connect_socket(host, port)
270
271 1
    def disconnect_socket(self):
272 1
        return self._uasocket.disconnect_socket()
273
274 1
    def send_hello(self, url):
275 1
        return self._uasocket.send_hello(url)
276
277 1
    def open_secure_channel(self, params):
278 1
        return self._uasocket.open_secure_channel(params)
279
280 1
    def close_secure_channel(self):
281
        """
282
        close secure channel. It seems to trigger a shutdown of socket
283
        in most servers, so be prepare to reconnect
284
        """
285 1
        return self._uasocket.close_secure_channel()
286
287 1
    def create_session(self, parameters):
288 1
        self.logger.info("create_session")
289 1
        request = ua.CreateSessionRequest()
290 1
        request.Parameters = parameters
291 1
        data = self._uasocket.send_request(request)
292 1
        response = ua.CreateSessionResponse.from_binary(data)
293 1
        response.ResponseHeader.ServiceResult.check()
294 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
295 1
        return response.Parameters
296
297 1
    def activate_session(self, parameters):
298 1
        self.logger.info("activate_session")
299 1
        request = ua.ActivateSessionRequest()
300 1
        request.Parameters = parameters
301 1
        data = self._uasocket.send_request(request)
302 1
        response = ua.ActivateSessionResponse.from_binary(data)
303 1
        response.ResponseHeader.ServiceResult.check()
304 1
        return response.Parameters
305
306 1
    def close_session(self, deletesubscriptions):
307 1
        self.logger.info("close_session")
308 1
        request = ua.CloseSessionRequest()
309 1
        request.DeleteSubscriptions = deletesubscriptions
310 1
        data = self._uasocket.send_request(request)
311 1
        ua.CloseSessionResponse.from_binary(data)
312
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
313
314 1
    def browse(self, parameters):
315 1
        self.logger.info("browse")
316 1
        request = ua.BrowseRequest()
317 1
        request.Parameters = parameters
318 1
        data = self._uasocket.send_request(request)
319 1
        response = ua.BrowseResponse.from_binary(data)
320 1
        response.ResponseHeader.ServiceResult.check()
321 1
        return response.Results
322
323 1
    def read(self, parameters):
324 1
        self.logger.info("read")
325 1
        request = ua.ReadRequest()
326 1
        request.Parameters = parameters
327 1
        data = self._uasocket.send_request(request)
328 1
        response = ua.ReadResponse.from_binary(data)
329 1
        response.ResponseHeader.ServiceResult.check()
330 1
        return response.Results
331
332 1
    def write(self, params):
333 1
        self.logger.info("read")
334 1
        request = ua.WriteRequest()
335 1
        request.Parameters = params
336 1
        data = self._uasocket.send_request(request)
337 1
        response = ua.WriteResponse.from_binary(data)
338 1
        response.ResponseHeader.ServiceResult.check()
339 1
        return response.Results
340
341 1
    def get_endpoints(self, params):
342 1
        self.logger.info("get_endpoint")
343 1
        request = ua.GetEndpointsRequest()
344 1
        request.Parameters = params
345 1
        data = self._uasocket.send_request(request)
346 1
        response = ua.GetEndpointsResponse.from_binary(data)
347 1
        response.ResponseHeader.ServiceResult.check()
348 1
        return response.Endpoints
349
350 1
    def find_servers(self, params):
351 1
        self.logger.info("find_servers")
352 1
        request = ua.FindServersRequest()
353 1
        request.Parameters = params
354 1
        data = self._uasocket.send_request(request)
355 1
        response = ua.FindServersResponse.from_binary(data)
356 1
        response.ResponseHeader.ServiceResult.check()
357 1
        return response.Servers
358
359 1
    def find_servers_on_network(self, params):
360
        self.logger.info("find_servers_on_network")
361
        request = ua.FindServersOnNetworkRequest()
362
        request.Parameters = params
363
        data = self._uasocket.send_request(request)
364
        response = ua.FindServersOnNetworkResponse.from_binary(data)
365
        response.ResponseHeader.ServiceResult.check()
366
        return response.Parameters
367
368 1
    def register_server(self, registered_server):
369 1
        self.logger.info("register_server")
370 1
        request = ua.RegisterServerRequest()
371 1
        request.Server = registered_server
372 1
        data = self._uasocket.send_request(request)
373 1
        response = ua.RegisterServerResponse.from_binary(data)
374 1
        response.ResponseHeader.ServiceResult.check()
375
        # nothing to return for this service
376
377 1
    def register_server2(self, params):
378
        self.logger.info("register_server2")
379
        request = ua.RegisterServer2Request()
380
        request.Parameters = params
381
        data = self._uasocket.send_request(request)
382
        response = ua.RegisterServer2Response.from_binary(data)
383
        response.ResponseHeader.ServiceResult.check()
384
        return response.ConfigurationResults
385
386 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
387 1
        self.logger.info("translate_browsepath_to_nodeid")
388 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
389 1
        request.Parameters.BrowsePaths = browsepaths
390 1
        data = self._uasocket.send_request(request)
391 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
392 1
        response.ResponseHeader.ServiceResult.check()
393 1
        return response.Results
394
395 1
    def create_subscription(self, params, callback):
396 1
        self.logger.info("create_subscription")
397 1
        request = ua.CreateSubscriptionRequest()
398 1
        request.Parameters = params
399 1
        data = self._uasocket.send_request(request)
400 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
401 1
        response.ResponseHeader.ServiceResult.check()
402 1
        with self._lock:
403 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
404 1
        return response.Parameters
405
406 1
    def delete_subscriptions(self, subscriptionids):
407 1
        self.logger.info("delete_subscription")
408 1
        request = ua.DeleteSubscriptionsRequest()
409 1
        request.Parameters.SubscriptionIds = subscriptionids
410 1
        data = self._uasocket.send_request(request)
411 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
412 1
        response.ResponseHeader.ServiceResult.check()
413 1
        for sid in subscriptionids:
414 1
            with self._lock:
415 1
                self._publishcallbacks.pop(sid)
416 1
        return response.Results
417
418 1
    def publish(self, acks=None):
419 1
        self.logger.info("publish")
420 1
        if acks is None:
421 1
            acks = []
422 1
        request = ua.PublishRequest()
423 1
        request.Parameters.SubscriptionAcknowledgements = acks
424 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
425
426 1
    def _call_publish_callback(self, future):
427 1
        self.logger.info("call_publish_callback")
428 1
        data = future.result()
429 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
430 1
        response = ua.PublishResponse.from_binary(data)
431 1
        with self._lock:
432 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
433 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
434 1
                return
435 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
436 1
        try:
437 1
            callback(response.Parameters)
438
        except Exception:  # we call client code, catch everything!
439
            self.logger.exception("Exception while calling user callback: %s")
440
441 1
    def create_monitored_items(self, params):
442 1
        self.logger.info("create_monitored_items")
443 1
        request = ua.CreateMonitoredItemsRequest()
444 1
        request.Parameters = params
445 1
        data = self._uasocket.send_request(request)
446 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
447 1
        response.ResponseHeader.ServiceResult.check()
448 1
        return response.Results
449
450 1
    def delete_monitored_items(self, params):
451 1
        self.logger.info("delete_monitored_items")
452 1
        request = ua.DeleteMonitoredItemsRequest()
453 1
        request.Parameters = params
454 1
        data = self._uasocket.send_request(request)
455 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
456 1
        response.ResponseHeader.ServiceResult.check()
457 1
        return response.Results
458
459 1
    def add_nodes(self, nodestoadd):
460 1
        self.logger.info("add_nodes")
461 1
        request = ua.AddNodesRequest()
462 1
        request.Parameters.NodesToAdd = nodestoadd
463 1
        data = self._uasocket.send_request(request)
464 1
        response = ua.AddNodesResponse.from_binary(data)
465 1
        response.ResponseHeader.ServiceResult.check()
466 1
        return response.Results
467
468 1
    def call(self, methodstocall):
469 1
        request = ua.CallRequest()
470 1
        request.Parameters.MethodsToCall = methodstocall
471 1
        data = self._uasocket.send_request(request)
472 1
        response = ua.CallResponse.from_binary(data)
473 1
        response.ResponseHeader.ServiceResult.check()
474 1
        return response.Results
475
476 1
    def history_read(self, params):
477
        self.logger.info("history_read")
478
        request = ua.HistoryReadRequest()
479
        request.Parameters = params
480
        data = self._uasocket.send_request(request)
481
        response = ua.HistoryReadResponse.from_binary(data)
482
        response.ResponseHeader.ServiceResult.check()
483
        return response.Results
484