Completed
Push — dev ( 3d1b3a...abb445 )
by Olivier
02:17
created

opcua.CachedRequest   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 6
Duplicated Lines 0 %

Test Coverage

Coverage 100%
Metric Value
wmc 2
dl 0
loc 6
ccs 5
cts 5
cp 1
rs 10

2 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 2 1
A to_binary() 0 2 1
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13
14 1
class CachedRequest(object):
15 1
    def __init__(self, binary):
16 1
        self.binary = binary
17
18 1
    def to_binary(self):
19 1
        return self.binary
20
21
22 1
class UASocketClient(object):
23
    """
24
    handle socket connection and send ua messages
25
    timeout is the timeout used while waiting for an ua answer from server
26
    """
27 1
    def __init__(self, timeout=1):
28 1
        self.logger = logging.getLogger(__name__ + "Socket")
29 1
        self._thread = None
30 1
        self._lock = Lock()
31 1
        self.timeout = timeout
32 1
        self._socket = None
33 1
        self._do_stop = False
34 1
        self._security_token = ua.ChannelSecurityToken()
35 1
        self.authentication_token = ua.NodeId()
36 1
        self._sequence_number = 0
37 1
        self._request_id = 0
38 1
        self._request_handle = 0
39 1
        self._callbackmap = {}
40
41 1
    def start(self):
42
        """
43
        Start receiving thread.
44
        this is called automatically in connect and
45
        should not be necessary to call directly
46
        """
47 1
        self._thread = Thread(target=self._run)
48 1
        self._thread.start()
49
50 1
    def send_request(self, request, callback=None, timeout=1000):
51
        """
52
        send request to server.
53
        timeout is the timeout written in ua header
54
        """
55 1
        with self._lock:
56 1
            request.RequestHeader = self._create_request_header(timeout)
57 1
            try:
58 1
                cachedreq = CachedRequest(request.to_binary())
59
            except:
60
                # reset reqeust handle if any error
61
                # see self._create_request_header
62
                self._request_handle -= 1
63
                raise
64 1
            hdr = ua.Header(ua.MessageType.SecureMessage, ua.ChunkType.Single, self._security_token.ChannelId)
65 1
            symhdr = self._create_sym_algo_header()
66 1
            seqhdr = self._create_sequence_header()
67 1
            future = Future()
68 1
            if callback:
69 1
                future.add_done_callback(callback)
70 1
            self._callbackmap[seqhdr.RequestId] = future
71 1
            self._write_socket(hdr, symhdr, seqhdr, cachedreq)
72 1
        if not callback:
73 1
            data = future.result(self.timeout)
74 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
75 1
            return data
76
77 1
    def check_answer(self, data, context):
78 1
        data = data.copy(50)  # FIXME check max length nodeid + responseheader
79 1
        typeid = ua.NodeId.from_binary(data)
80 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
81
            self.logger.warning("ServiceFault from server received %s", context)
82
            hdr = ua.ResponseHeader.from_binary(data)
83
            hdr.ServiceResult.check()
84
            return False
85 1
        return True
86
87 1
    def _run(self):
88 1
        self.logger.info("Thread started")
89 1
        while not self._do_stop:
90 1
            try:
91 1
                self._receive()
92 1
            except ua.utils.SocketClosedException:
93 1
                self.logger.info("Socket has closed connection")
94 1
                break
95 1
        self.logger.info("Thread ended")
96
97 1
    def _receive_header(self):
98 1
        self.logger.debug("Waiting for header")
99 1
        header = ua.Header.from_string(self._socket)
100 1
        self.logger.info("received header: %s", header)
101 1
        return header
102
103 1
    def _receive_body(self, size):
104 1
        self.logger.debug("reading body of message (%s bytes)", size)
105 1
        data = self._socket.read(size)
106 1
        if size != len(data):
107
            raise Exception("Error, did not receive expected number of bytes, got {}, asked for {}".format(len(data), size))
108 1
        return utils.Buffer(data)
109
110 1
    def _receive(self):
111 1
        body_chunk = b""
112 1
        while True:
113 1
            ret = self._receive_complete_msg()
114 1
            if ret is None:
115 1
                return
116 1
            hdr, algohdr, seqhdr, body = ret 
117 1
            if hdr.ChunkType in (b"F", b"A"):
118 1
                body.data = body_chunk + body.data
119 1
                break
120
            elif hdr.ChunkType == b"C":
121
                self.logger.debug("Received an intermediate message with header %s, waiting for next message", hdr)
122
                body_chunk += body.data
123
            else:
124
                self.logger.warning("Received a message with unknown ChunkType %s, in header %s", hdr.ChunkType, hdr)
125
                return
126 1
        self._call_callback(seqhdr.RequestId, body)
127
128 1
    def _receive_complete_msg(self):
129 1
        header = self._receive_header()
130 1
        if header is None:
131
            return None
132 1
        body = self._receive_body(header.body_size)
133 1
        if header.MessageType == ua.MessageType.Error:
134
            self.logger.warning("Received an error message type")
135
            err = ua.ErrorMessage.from_binary(body)
136
            self.logger.warning(err)
137
            return None
138 1
        if header.MessageType == ua.MessageType.Acknowledge:
139 1
            self._call_callback(0, body)
140 1
            return None
141 1
        elif header.MessageType == ua.MessageType.SecureOpen:
142 1
            algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
143 1
            self.logger.info(algohdr)
144 1
        elif header.MessageType == ua.MessageType.SecureMessage:
145 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
146 1
            self.logger.info(algohdr)
147
        else:
148
            self.logger.warning("Unsupported message type: %s", header.MessageType)
149
            return None
150 1
        seqhdr = ua.SequenceHeader.from_binary(body)
151 1
        self.logger.debug(seqhdr)
152 1
        return header, algohdr, seqhdr, body
153
154 1
    def _call_callback(self, request_id, body):
155 1
        with self._lock:
156 1
            future = self._callbackmap.pop(request_id, None)
157 1
            if future is None:
158
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
159 1
        future.set_result(body)
160
161 1
    def _write_socket(self, hdr, *args):
162 1
        alle = []
163 1
        for arg in args:
164 1
            data = arg.to_binary()
165 1
            hdr.add_size(len(data))
166 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
167 1
            self.logger.debug("struct: %s", arg)
168 1
            self.logger.debug("data: %s", data)
169 1
            alle.append(data)
170 1
        alle.insert(0, hdr.to_binary())
171 1
        alle = b"".join(alle)
172 1
        self._socket.write(alle)
173
174 1
    def _create_request_header(self, timeout=1000):
175 1
        hdr = ua.RequestHeader()
176 1
        hdr.AuthenticationToken = self.authentication_token
177 1
        self._request_handle += 1
178 1
        hdr.RequestHandle = self._request_handle
179 1
        hdr.TimeoutHint = timeout
180 1
        return hdr
181
182 1
    def _create_sym_algo_header(self):
183 1
        hdr = ua.SymmetricAlgorithmHeader()
184 1
        hdr.TokenId = self._security_token.TokenId
185 1
        return hdr
186
187 1
    def _create_sequence_header(self):
188 1
        hdr = ua.SequenceHeader()
189 1
        self._sequence_number += 1
190 1
        hdr.SequenceNumber = self._sequence_number
191 1
        self._request_id += 1
192 1
        hdr.RequestId = self._request_id
193 1
        return hdr
194
195 1
    def connect_socket(self, host, port):
196
        """
197
        connect to server socket and start receiving thread
198
        """
199 1
        self.logger.info("opening connection")
200 1
        sock = socket.create_connection((host, port))
201 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
202 1
        self._socket = utils.SocketWrapper(sock)
203 1
        self.start()
204
205 1
    def disconnect_socket(self):
206 1
        self.logger.info("stop request")
207 1
        self._do_stop = True
208 1
        self._socket.socket.shutdown(socket.SHUT_WR)
209
210 1
    def send_hello(self, url):
211 1
        hello = ua.Hello()
212 1
        hello.EndpointUrl = url
213 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
214 1
        future = Future()
215 1
        with self._lock:
216 1
            self._callbackmap[0] = future
217 1
        self._write_socket(header, hello)
218 1
        return ua.Acknowledge.from_binary(future.result(self.timeout))
219
220 1
    def open_secure_channel(self, params):
221 1
        self.logger.info("open_secure_channel")
222 1
        with self._lock:
223 1
            request = ua.OpenSecureChannelRequest()
224 1
            request.Parameters = params
225 1
            request.RequestHeader = self._create_request_header()
226
227 1
            hdr = ua.Header(ua.MessageType.SecureOpen, ua.ChunkType.Single, self._security_token.ChannelId)
228 1
            asymhdr = ua.AsymmetricAlgorithmHeader()
229 1
            seqhdr = self._create_sequence_header()
230
231 1
            future = Future()
232 1
            self._callbackmap[seqhdr.RequestId] = future
233 1
            self._write_socket(hdr, asymhdr, seqhdr, request)
234
235 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
236 1
        response.ResponseHeader.ServiceResult.check()
237 1
        self._security_token = response.Parameters.SecurityToken
238 1
        return response.Parameters
239
240 1
    def close_secure_channel(self):
241
        """
242
        close secure channel. It seems to trigger a shutdown of socket
243
        in most servers, so be prepare to reconnect
244
        """
245 1
        self.logger.info("get_endpoint")
246 1
        with self._lock:
247 1
            request = ua.CloseSecureChannelRequest()
248 1
            request.RequestHeader = self._create_request_header()
249
250 1
            hdr = ua.Header(ua.MessageType.SecureClose, ua.ChunkType.Single, self._security_token.ChannelId)
251 1
            symhdr = self._create_sym_algo_header()
252 1
            seqhdr = self._create_sequence_header()
253 1
            self._write_socket(hdr, symhdr, seqhdr, request)
254
255
        # some servers send a response here, most do not ... so we ignore
256
257
258
259 1
class BinaryClient(object):
260
261
    """
262
    low level OPC-UA client.
263
    implement all(well..one day) methods defined in opcua spec
264
    taking in argument the structures defined in opcua spec
265
    in python most of the structures are defined in
266
    uaprotocol_auto.py and uaprotocol_hand.py
267
    """
268
269 1
    def __init__(self, timeout=1):
270 1
        self.logger = logging.getLogger(__name__)
271 1
        self._publishcallbacks = {}
272 1
        self._lock = Lock()
273 1
        self._timeout = timeout
274 1
        self._uasocket = None
275
276 1
    def connect_socket(self, host, port):
277
        """
278
        connect to server socket and start receiving thread
279
        """
280 1
        self._uasocket = UASocketClient(self._timeout)
281 1
        return self._uasocket.connect_socket(host, port)
282
283 1
    def disconnect_socket(self):
284 1
        return self._uasocket.disconnect_socket()
285
286 1
    def send_hello(self, url):
287 1
        return self._uasocket.send_hello(url)
288
289 1
    def open_secure_channel(self, params):
290 1
        return self._uasocket.open_secure_channel(params)
291
292 1
    def close_secure_channel(self):
293
        """
294
        close secure channel. It seems to trigger a shutdown of socket
295
        in most servers, so be prepare to reconnect
296
        """
297 1
        return self._uasocket.close_secure_channel()
298
299 1
    def create_session(self, parameters):
300 1
        self.logger.info("create_session")
301 1
        request = ua.CreateSessionRequest()
302 1
        request.Parameters = parameters
303 1
        data = self._uasocket.send_request(request)
304 1
        response = ua.CreateSessionResponse.from_binary(data)
305 1
        response.ResponseHeader.ServiceResult.check()
306 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
307 1
        return response.Parameters
308
309 1
    def activate_session(self, parameters):
310 1
        self.logger.info("activate_session")
311 1
        request = ua.ActivateSessionRequest()
312 1
        request.Parameters = parameters
313 1
        data = self._uasocket.send_request(request)
314 1
        response = ua.ActivateSessionResponse.from_binary(data)
315 1
        response.ResponseHeader.ServiceResult.check()
316 1
        return response.Parameters
317
318 1
    def close_session(self, deletesubscriptions):
319 1
        self.logger.info("close_session")
320 1
        request = ua.CloseSessionRequest()
321 1
        request.DeleteSubscriptions = deletesubscriptions
322 1
        data = self._uasocket.send_request(request)
323 1
        ua.CloseSessionResponse.from_binary(data)
324
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
325
326 1
    def browse(self, parameters):
327 1
        self.logger.info("browse")
328 1
        request = ua.BrowseRequest()
329 1
        request.Parameters = parameters
330 1
        data = self._uasocket.send_request(request)
331 1
        response = ua.BrowseResponse.from_binary(data)
332 1
        response.ResponseHeader.ServiceResult.check()
333 1
        return response.Results
334
335 1
    def read(self, parameters):
336 1
        self.logger.info("read")
337 1
        request = ua.ReadRequest()
338 1
        request.Parameters = parameters
339 1
        data = self._uasocket.send_request(request)
340 1
        response = ua.ReadResponse.from_binary(data)
341 1
        response.ResponseHeader.ServiceResult.check()
342 1
        return response.Results
343
344 1
    def write(self, params):
345 1
        self.logger.info("read")
346 1
        request = ua.WriteRequest()
347 1
        request.Parameters = params
348 1
        data = self._uasocket.send_request(request)
349 1
        response = ua.WriteResponse.from_binary(data)
350 1
        response.ResponseHeader.ServiceResult.check()
351 1
        return response.Results
352
353 1
    def get_endpoints(self, params):
354 1
        self.logger.info("get_endpoint")
355 1
        request = ua.GetEndpointsRequest()
356 1
        request.Parameters = params
357 1
        data = self._uasocket.send_request(request)
358 1
        response = ua.GetEndpointsResponse.from_binary(data)
359 1
        response.ResponseHeader.ServiceResult.check()
360 1
        return response.Endpoints
361
362 1
    def find_servers(self, params):
363 1
        self.logger.info("find_servers")
364 1
        request = ua.FindServersRequest()
365 1
        request.Parameters = params
366 1
        data = self._uasocket.send_request(request)
367 1
        response = ua.FindServersResponse.from_binary(data)
368 1
        response.ResponseHeader.ServiceResult.check()
369 1
        return response.Servers
370
371 1
    def find_servers_on_network(self, params):
372
        self.logger.info("find_servers_on_network")
373
        request = ua.FindServersOnNetworkRequest()
374
        request.Parameters = params
375
        data = self._uasocket.send_request(request)
376
        response = ua.FindServersOnNetworkResponse.from_binary(data)
377
        response.ResponseHeader.ServiceResult.check()
378
        return response.Parameters
379
380 1
    def register_server(self, registered_server):
381 1
        self.logger.info("register_server")
382 1
        request = ua.RegisterServerRequest()
383 1
        request.Server = registered_server
384 1
        data = self._uasocket.send_request(request)
385 1
        response = ua.RegisterServerResponse.from_binary(data)
386 1
        response.ResponseHeader.ServiceResult.check()
387
        # nothing to return for this service
388
389 1
    def register_server2(self, params):
390
        self.logger.info("register_server2")
391
        request = ua.RegisterServer2Request()
392
        request.Parameters = params
393
        data = self._uasocket.send_request(request)
394
        response = ua.RegisterServer2Response.from_binary(data)
395
        response.ResponseHeader.ServiceResult.check()
396
        return response.ConfigurationResults
397
398 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
399 1
        self.logger.info("translate_browsepath_to_nodeid")
400 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
401 1
        request.Parameters.BrowsePaths = browsepaths
402 1
        data = self._uasocket.send_request(request)
403 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
404 1
        response.ResponseHeader.ServiceResult.check()
405 1
        return response.Results
406
407 1
    def create_subscription(self, params, callback):
408 1
        self.logger.info("create_subscription")
409 1
        request = ua.CreateSubscriptionRequest()
410 1
        request.Parameters = params
411 1
        data = self._uasocket.send_request(request)
412 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
413 1
        response.ResponseHeader.ServiceResult.check()
414 1
        with self._lock:
415 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
416 1
        return response.Parameters
417
418 1
    def delete_subscriptions(self, subscriptionids):
419 1
        self.logger.info("delete_subscription")
420 1
        request = ua.DeleteSubscriptionsRequest()
421 1
        request.Parameters.SubscriptionIds = subscriptionids
422 1
        data = self._uasocket.send_request(request)
423 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
424 1
        response.ResponseHeader.ServiceResult.check()
425 1
        for sid in subscriptionids:
426 1
            with self._lock:
427 1
                self._publishcallbacks.pop(sid)
428 1
        return response.Results
429
430 1
    def publish(self, acks=None):
431 1
        self.logger.info("publish")
432 1
        if acks is None:
433 1
            acks = []
434 1
        request = ua.PublishRequest()
435 1
        request.Parameters.SubscriptionAcknowledgements = acks
436 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
437
438 1
    def _call_publish_callback(self, future):
439 1
        self.logger.info("call_publish_callback")
440 1
        data = future.result()
441 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
442 1
        response = ua.PublishResponse.from_binary(data)
443 1
        with self._lock:
444 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
445 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
446 1
                return
447 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
448 1
        try:
449 1
            callback(response.Parameters)
450
        except Exception:  # we call client code, catch everything!
451
            self.logger.exception("Exception while calling user callback: %s")
452
453 1
    def create_monitored_items(self, params):
454 1
        self.logger.info("create_monitored_items")
455 1
        request = ua.CreateMonitoredItemsRequest()
456 1
        request.Parameters = params
457 1
        data = self._uasocket.send_request(request)
458 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
459 1
        response.ResponseHeader.ServiceResult.check()
460 1
        return response.Results
461
462 1
    def delete_monitored_items(self, params):
463 1
        self.logger.info("delete_monitored_items")
464 1
        request = ua.DeleteMonitoredItemsRequest()
465 1
        request.Parameters = params
466 1
        data = self._uasocket.send_request(request)
467 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
468 1
        response.ResponseHeader.ServiceResult.check()
469 1
        return response.Results
470
471 1
    def add_nodes(self, nodestoadd):
472 1
        self.logger.info("add_nodes")
473 1
        request = ua.AddNodesRequest()
474 1
        request.Parameters.NodesToAdd = nodestoadd
475 1
        data = self._uasocket.send_request(request)
476 1
        response = ua.AddNodesResponse.from_binary(data)
477 1
        response.ResponseHeader.ServiceResult.check()
478 1
        return response.Results
479
480 1
    def call(self, methodstocall):
481 1
        request = ua.CallRequest()
482 1
        request.Parameters.MethodsToCall = methodstocall
483 1
        data = self._uasocket.send_request(request)
484 1
        response = ua.CallResponse.from_binary(data)
485 1
        response.ResponseHeader.ServiceResult.check()
486 1
        return response.Results
487
488 1
    def history_read(self, params):
489
        self.logger.info("history_read")
490
        request = ua.HistoryReadRequest()
491
        request.Parameters = params
492
        data = self._uasocket.send_request(request)
493
        response = ua.HistoryReadResponse.from_binary(data)
494
        response.ResponseHeader.ServiceResult.check()
495
        return response.Results
496