Completed
Pull Request — master (#77)
by
unknown
07:11
created

opcua.FakeRequest.to_binary()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13
14 1
class FakeRequest(object):
15
    # HACK to remove duplicate call Request.to_binary
16 1
    def __init__(self, binary):
17 1
        self.binary = binary
18
19 1
    def to_binary(self):
20 1
        return self.binary
21
22
23 1
class UASocketClient(object):
24
    """
25
    handle socket connection and send ua messages
26
    timeout is the timeout used while waiting for an ua answer from server
27
    """
28 1
    def __init__(self, timeout=1):
29 1
        self.logger = logging.getLogger(__name__ + "Socket")
30 1
        self._thread = None
31 1
        self._lock = Lock()
32 1
        self.timeout = timeout
33 1
        self._socket = None
34 1
        self._do_stop = False
35 1
        self._security_token = ua.ChannelSecurityToken()
36 1
        self.authentication_token = ua.NodeId()
37 1
        self._sequence_number = 0
38 1
        self._request_id = 0
39 1
        self._request_handle = 0
40 1
        self._callbackmap = {}
41
42 1
    def start(self):
43
        """
44
        Start receiving thread.
45
        this is called automatically in connect and
46
        should not be necessary to call directly
47
        """
48 1
        self._thread = Thread(target=self._run)
49 1
        self._thread.start()
50
51 1
    def send_request(self, request, callback=None, timeout=1000):
52
        """
53
        send request to server.
54
        timeout is the timeout written in ua header
55
        """
56 1
        with self._lock:
57 1
            request.RequestHeader = self._create_request_header(timeout)
58 1
            try:
59 1
                fakereq = FakeRequest(request.to_binary())
60
            except:
61
                # reset reqeust handle if any error
62
                # see self._create_request_header
63
                self._request_handle -= 1
64
                raise
65 1
            hdr = ua.Header(ua.MessageType.SecureMessage, ua.ChunkType.Single, self._security_token.ChannelId)
66 1
            symhdr = self._create_sym_algo_header()
67 1
            seqhdr = self._create_sequence_header()
68 1
            future = Future()
69 1
            if callback:
70 1
                future.add_done_callback(callback)
71 1
            self._callbackmap[seqhdr.RequestId] = future
72 1
            self._write_socket(hdr, symhdr, seqhdr, fakereq)
73 1
        if not callback:
74 1
            data = future.result(self.timeout)
75 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
76 1
            return data
77
78 1
    def check_answer(self, data, context):
79 1
        data = data.copy(50)  # FIXME check max length nodeid + responseheader
80 1
        typeid = ua.NodeId.from_binary(data)
81 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
82
            self.logger.warning("ServiceFault from server received %s", context)
83
            hdr = ua.ResponseHeader.from_binary(data)
84
            hdr.ServiceResult.check()
85
            return False
86 1
        return True
87
88 1
    def _run(self):
89 1
        self.logger.info("Thread started")
90 1
        while not self._do_stop:
91 1
            try:
92 1
                self._receive()
93 1
            except ua.utils.SocketClosedException:
94 1
                self.logger.info("Socket has closed connection")
95 1
                break
96 1
        self.logger.info("Thread ended")
97
98 1
    def _receive_header(self):
99 1
        self.logger.debug("Waiting for header")
100 1
        header = ua.Header.from_string(self._socket)
101 1
        self.logger.info("received header: %s", header)
102 1
        return header
103
104 1
    def _receive_body(self, size):
105 1
        self.logger.debug("reading body of message (%s bytes)", size)
106 1
        data = self._socket.read(size)
107 1
        if size != len(data):
108
            raise Exception("Error, did not receive expected number of bytes, got {}, asked for {}".format(len(data), size))
109 1
        return utils.Buffer(data)
110
111 1
    def _receive(self):
112 1
        body_chunk = b""
113 1
        while True:
114 1
            ret = self._receive_complete_msg()
115 1
            if ret is None:
116 1
                return
117 1
            hdr, algohdr, seqhdr, body = ret 
118 1
            if hdr.ChunkType in (b"F", b"A"):
119 1
                body.data = body_chunk + body.data
120 1
                break
121
            elif hdr.ChunkType == b"C":
122
                self.logger.debug("Received an intermediate message with header %s, waiting for next message", hdr)
123
                body_chunk += body.data
124
            else:
125
                self.logger.warning("Received a message with unknown ChunkType %s, in header %s", hdr.ChunkType, hdr)
126
                return
127 1
        self._call_callback(seqhdr.RequestId, body)
128
129 1
    def _receive_complete_msg(self):
130 1
        header = self._receive_header()
131 1
        if header is None:
132
            return None
133 1
        body = self._receive_body(header.body_size)
134 1
        if header.MessageType == ua.MessageType.Error:
135
            self.logger.warning("Received an error message type")
136
            err = ua.ErrorMessage.from_binary(body)
137
            self.logger.warning(err)
138
            return None
139 1
        if header.MessageType == ua.MessageType.Acknowledge:
140 1
            self._call_callback(0, body)
141 1
            return None
142 1
        elif header.MessageType == ua.MessageType.SecureOpen:
143 1
            algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
144 1
            self.logger.info(algohdr)
145 1
        elif header.MessageType == ua.MessageType.SecureMessage:
146 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
147 1
            self.logger.info(algohdr)
148
        else:
149
            self.logger.warning("Unsupported message type: %s", header.MessageType)
150
            return None
151 1
        seqhdr = ua.SequenceHeader.from_binary(body)
152 1
        self.logger.debug(seqhdr)
153 1
        return header, algohdr, seqhdr, body
154
155 1
    def _call_callback(self, request_id, body):
156 1
        with self._lock:
157 1
            future = self._callbackmap.pop(request_id, None)
158 1
            if future is None:
159
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
160 1
        future.set_result(body)
161
162 1
    def _write_socket(self, hdr, *args):
163 1
        alle = []
164 1
        for arg in args:
165 1
            data = arg.to_binary()
166 1
            hdr.add_size(len(data))
167 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
168 1
            self.logger.debug("struct: %s", arg)
169 1
            self.logger.debug("data: %s", data)
170 1
            alle.append(data)
171 1
        alle.insert(0, hdr.to_binary())
172 1
        alle = b"".join(alle)
173 1
        self._socket.write(alle)
174
175 1
    def _create_request_header(self, timeout=1000):
176 1
        hdr = ua.RequestHeader()
177 1
        hdr.AuthenticationToken = self.authentication_token
178 1
        self._request_handle += 1
179 1
        hdr.RequestHandle = self._request_handle
180 1
        hdr.TimeoutHint = timeout
181 1
        return hdr
182
183 1
    def _create_sym_algo_header(self):
184 1
        hdr = ua.SymmetricAlgorithmHeader()
185 1
        hdr.TokenId = self._security_token.TokenId
186 1
        return hdr
187
188 1
    def _create_sequence_header(self):
189 1
        hdr = ua.SequenceHeader()
190 1
        self._sequence_number += 1
191 1
        hdr.SequenceNumber = self._sequence_number
192 1
        self._request_id += 1
193 1
        hdr.RequestId = self._request_id
194 1
        return hdr
195
196 1
    def connect_socket(self, host, port):
197
        """
198
        connect to server socket and start receiving thread
199
        """
200 1
        self.logger.info("opening connection")
201 1
        sock = socket.create_connection((host, port))
202 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
203 1
        self._socket = utils.SocketWrapper(sock)
204 1
        self.start()
205
206 1
    def disconnect_socket(self):
207 1
        self.logger.info("stop request")
208 1
        self._do_stop = True
209 1
        self._socket.socket.shutdown(socket.SHUT_WR)
210
211 1
    def send_hello(self, url):
212 1
        hello = ua.Hello()
213 1
        hello.EndpointUrl = url
214 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
215 1
        future = Future()
216 1
        with self._lock:
217 1
            self._callbackmap[0] = future
218 1
        self._write_socket(header, hello)
219 1
        return ua.Acknowledge.from_binary(future.result(self.timeout))
220
221 1
    def open_secure_channel(self, params):
222 1
        self.logger.info("open_secure_channel")
223 1
        with self._lock:
224 1
            request = ua.OpenSecureChannelRequest()
225 1
            request.Parameters = params
226 1
            request.RequestHeader = self._create_request_header()
227
228 1
            hdr = ua.Header(ua.MessageType.SecureOpen, ua.ChunkType.Single, self._security_token.ChannelId)
229 1
            asymhdr = ua.AsymmetricAlgorithmHeader()
230 1
            seqhdr = self._create_sequence_header()
231
232 1
            future = Future()
233 1
            self._callbackmap[seqhdr.RequestId] = future
234 1
            self._write_socket(hdr, asymhdr, seqhdr, request)
235
236 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
237 1
        response.ResponseHeader.ServiceResult.check()
238 1
        self._security_token = response.Parameters.SecurityToken
239 1
        return response.Parameters
240
241 1
    def close_secure_channel(self):
242
        """
243
        close secure channel. It seems to trigger a shutdown of socket
244
        in most servers, so be prepare to reconnect
245
        """
246 1
        self.logger.info("get_endpoint")
247 1
        with self._lock:
248 1
            request = ua.CloseSecureChannelRequest()
249 1
            request.RequestHeader = self._create_request_header()
250
251 1
            hdr = ua.Header(ua.MessageType.SecureClose, ua.ChunkType.Single, self._security_token.ChannelId)
252 1
            symhdr = self._create_sym_algo_header()
253 1
            seqhdr = self._create_sequence_header()
254 1
            self._write_socket(hdr, symhdr, seqhdr, request)
255
256
        # some servers send a response here, most do not ... so we ignore
257
258
259
260 1
class BinaryClient(object):
261
262
    """
263
    low level OPC-UA client.
264
    implement all(well..one day) methods defined in opcua spec
265
    taking in argument the structures defined in opcua spec
266
    in python most of the structures are defined in
267
    uaprotocol_auto.py and uaprotocol_hand.py
268
    """
269
270 1
    def __init__(self, timeout=1):
271 1
        self.logger = logging.getLogger(__name__)
272 1
        self._publishcallbacks = {}
273 1
        self._lock = Lock()
274 1
        self._timeout = timeout
275 1
        self._uasocket = None
276
277 1
    def connect_socket(self, host, port):
278
        """
279
        connect to server socket and start receiving thread
280
        """
281 1
        self._uasocket = UASocketClient(self._timeout)
282 1
        return self._uasocket.connect_socket(host, port)
283
284 1
    def disconnect_socket(self):
285 1
        return self._uasocket.disconnect_socket()
286
287 1
    def send_hello(self, url):
288 1
        return self._uasocket.send_hello(url)
289
290 1
    def open_secure_channel(self, params):
291 1
        return self._uasocket.open_secure_channel(params)
292
293 1
    def close_secure_channel(self):
294
        """
295
        close secure channel. It seems to trigger a shutdown of socket
296
        in most servers, so be prepare to reconnect
297
        """
298 1
        return self._uasocket.close_secure_channel()
299
300 1
    def create_session(self, parameters):
301 1
        self.logger.info("create_session")
302 1
        request = ua.CreateSessionRequest()
303 1
        request.Parameters = parameters
304 1
        data = self._uasocket.send_request(request)
305 1
        response = ua.CreateSessionResponse.from_binary(data)
306 1
        response.ResponseHeader.ServiceResult.check()
307 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
308 1
        return response.Parameters
309
310 1
    def activate_session(self, parameters):
311 1
        self.logger.info("activate_session")
312 1
        request = ua.ActivateSessionRequest()
313 1
        request.Parameters = parameters
314 1
        data = self._uasocket.send_request(request)
315 1
        response = ua.ActivateSessionResponse.from_binary(data)
316 1
        response.ResponseHeader.ServiceResult.check()
317 1
        return response.Parameters
318
319 1
    def close_session(self, deletesubscriptions):
320 1
        self.logger.info("close_session")
321 1
        request = ua.CloseSessionRequest()
322 1
        request.DeleteSubscriptions = deletesubscriptions
323 1
        data = self._uasocket.send_request(request)
324 1
        ua.CloseSessionResponse.from_binary(data)
325
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
326
327 1
    def browse(self, parameters):
328 1
        self.logger.info("browse")
329 1
        request = ua.BrowseRequest()
330 1
        request.Parameters = parameters
331 1
        data = self._uasocket.send_request(request)
332 1
        response = ua.BrowseResponse.from_binary(data)
333 1
        response.ResponseHeader.ServiceResult.check()
334 1
        return response.Results
335
336 1
    def read(self, parameters):
337 1
        self.logger.info("read")
338 1
        request = ua.ReadRequest()
339 1
        request.Parameters = parameters
340 1
        data = self._uasocket.send_request(request)
341 1
        response = ua.ReadResponse.from_binary(data)
342 1
        response.ResponseHeader.ServiceResult.check()
343 1
        return response.Results
344
345 1
    def write(self, params):
346 1
        self.logger.info("read")
347 1
        request = ua.WriteRequest()
348 1
        request.Parameters = params
349 1
        data = self._uasocket.send_request(request)
350 1
        response = ua.WriteResponse.from_binary(data)
351 1
        response.ResponseHeader.ServiceResult.check()
352 1
        return response.Results
353
354 1
    def get_endpoints(self, params):
355 1
        self.logger.info("get_endpoint")
356 1
        request = ua.GetEndpointsRequest()
357 1
        request.Parameters = params
358 1
        data = self._uasocket.send_request(request)
359 1
        response = ua.GetEndpointsResponse.from_binary(data)
360 1
        response.ResponseHeader.ServiceResult.check()
361 1
        return response.Endpoints
362
363 1
    def find_servers(self, params):
364 1
        self.logger.info("find_servers")
365 1
        request = ua.FindServersRequest()
366 1
        request.Parameters = params
367 1
        data = self._uasocket.send_request(request)
368 1
        response = ua.FindServersResponse.from_binary(data)
369 1
        response.ResponseHeader.ServiceResult.check()
370 1
        return response.Servers
371
372 1
    def find_servers_on_network(self, params):
373
        self.logger.info("find_servers_on_network")
374
        request = ua.FindServersOnNetworkRequest()
375
        request.Parameters = params
376
        data = self._uasocket.send_request(request)
377
        response = ua.FindServersOnNetworkResponse.from_binary(data)
378
        response.ResponseHeader.ServiceResult.check()
379
        return response.Parameters
380
381 1
    def register_server(self, registered_server):
382 1
        self.logger.info("register_server")
383 1
        request = ua.RegisterServerRequest()
384 1
        request.Server = registered_server
385 1
        data = self._uasocket.send_request(request)
386 1
        response = ua.RegisterServerResponse.from_binary(data)
387 1
        response.ResponseHeader.ServiceResult.check()
388
        # nothing to return for this service
389
390 1
    def register_server2(self, params):
391
        self.logger.info("register_server2")
392
        request = ua.RegisterServer2Request()
393
        request.Parameters = params
394
        data = self._uasocket.send_request(request)
395
        response = ua.RegisterServer2Response.from_binary(data)
396
        response.ResponseHeader.ServiceResult.check()
397
        return response.ConfigurationResults
398
399 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
400 1
        self.logger.info("translate_browsepath_to_nodeid")
401 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
402 1
        request.Parameters.BrowsePaths = browsepaths
403 1
        data = self._uasocket.send_request(request)
404 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
405 1
        response.ResponseHeader.ServiceResult.check()
406 1
        return response.Results
407
408 1
    def create_subscription(self, params, callback):
409 1
        self.logger.info("create_subscription")
410 1
        request = ua.CreateSubscriptionRequest()
411 1
        request.Parameters = params
412 1
        data = self._uasocket.send_request(request)
413 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
414 1
        response.ResponseHeader.ServiceResult.check()
415 1
        with self._lock:
416 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
417 1
        return response.Parameters
418
419 1
    def delete_subscriptions(self, subscriptionids):
420 1
        self.logger.info("delete_subscription")
421 1
        request = ua.DeleteSubscriptionsRequest()
422 1
        request.Parameters.SubscriptionIds = subscriptionids
423 1
        data = self._uasocket.send_request(request)
424 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
425 1
        response.ResponseHeader.ServiceResult.check()
426 1
        for sid in subscriptionids:
427 1
            with self._lock:
428 1
                self._publishcallbacks.pop(sid)
429 1
        return response.Results
430
431 1
    def publish(self, acks=None):
432 1
        self.logger.info("publish")
433 1
        if acks is None:
434 1
            acks = []
435 1
        request = ua.PublishRequest()
436 1
        request.Parameters.SubscriptionAcknowledgements = acks
437 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
438
439 1
    def _call_publish_callback(self, future):
440 1
        self.logger.info("call_publish_callback")
441 1
        data = future.result()
442 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
443 1
        response = ua.PublishResponse.from_binary(data)
444 1
        with self._lock:
445 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
446 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
447 1
                return
448 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
449 1
        try:
450 1
            callback(response.Parameters)
451
        except Exception:  # we call client code, catch everything!
452
            self.logger.exception("Exception while calling user callback: %s")
453
454 1
    def create_monitored_items(self, params):
455 1
        self.logger.info("create_monitored_items")
456 1
        request = ua.CreateMonitoredItemsRequest()
457 1
        request.Parameters = params
458 1
        data = self._uasocket.send_request(request)
459 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
460 1
        response.ResponseHeader.ServiceResult.check()
461 1
        return response.Results
462
463 1
    def delete_monitored_items(self, params):
464 1
        self.logger.info("delete_monitored_items")
465 1
        request = ua.DeleteMonitoredItemsRequest()
466 1
        request.Parameters = params
467 1
        data = self._uasocket.send_request(request)
468 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
469 1
        response.ResponseHeader.ServiceResult.check()
470 1
        return response.Results
471
472 1
    def add_nodes(self, nodestoadd):
473 1
        self.logger.info("add_nodes")
474 1
        request = ua.AddNodesRequest()
475 1
        request.Parameters.NodesToAdd = nodestoadd
476 1
        data = self._uasocket.send_request(request)
477 1
        response = ua.AddNodesResponse.from_binary(data)
478 1
        response.ResponseHeader.ServiceResult.check()
479 1
        return response.Results
480
481 1
    def call(self, methodstocall):
482 1
        request = ua.CallRequest()
483 1
        request.Parameters.MethodsToCall = methodstocall
484 1
        data = self._uasocket.send_request(request)
485 1
        response = ua.CallResponse.from_binary(data)
486 1
        response.ResponseHeader.ServiceResult.check()
487 1
        return response.Results
488
489 1
    def history_read(self, params):
490
        self.logger.info("history_read")
491
        request = ua.HistoryReadRequest()
492
        request.Parameters = params
493
        data = self._uasocket.send_request(request)
494
        response = ua.HistoryReadResponse.from_binary(data)
495
        response.ResponseHeader.ServiceResult.check()
496
        return response.Results
497