Completed
Push — dev ( a35960...17cd27 )
by Olivier
02:25
created

opcua.BinaryClient.__init__()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1
Metric Value
dl 0
loc 5
ccs 5
cts 5
cp 1
rs 9.4286
cc 1
crap 1
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13
14 1
class UASocketClient(object):
15
    """
16
    handle socket connection and send ua messages
17
    timeout is the timeout used while waiting for an ua answer from server
18
    """
19 1
    def __init__(self, timeout=1):
20 1
        self.logger = logging.getLogger(__name__ + "Socket")
21 1
        self._thread = None
22 1
        self._lock = Lock()
23 1
        self.timeout = timeout
24 1
        self._socket = None
25 1
        self._do_stop = False
26 1
        self._security_token = ua.ChannelSecurityToken()
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._sequence_number = 0
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.start()
41
42 1
    def send_request(self, request, callback=None, timeout=1000):
43
        """
44
        send request to server.
45
        timeout is the timeout written in ua header
46
        """
47
        # HACK to make sure we can convert our request to binary before increasing request counter etc ...
48 1
        request.to_binary()
49
        # END HACK
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            hdr = ua.Header(ua.MessageType.SecureMessage, ua.ChunkType.Single, self._security_token.ChannelId)
53 1
            symhdr = self._create_sym_algo_header()
54 1
            seqhdr = self._create_sequence_header()
55 1
            future = Future()
56 1
            if callback:
57 1
                future.add_done_callback(callback)
58 1
            self._callbackmap[seqhdr.RequestId] = future
59 1
            self._write_socket(hdr, symhdr, seqhdr, request)
60 1
        if not callback:
61 1
            data = future.result(self.timeout)
62 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
63 1
            return data
64
65 1
    def check_answer(self, data, context):
66 1
        data = data.copy(50)  # FIXME check max length nodeid + responseheader
67 1
        typeid = ua.NodeId.from_binary(data)
68 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
69
            self.logger.warning("ServiceFault from server received %s", context)
70
            hdr = ua.ResponseHeader.from_binary(data)
71
            hdr.ServiceResult.check()
72
            return False
73 1
        return True
74
75 1
    def _run(self):
76 1
        self.logger.info("Thread started")
77 1
        while not self._do_stop:
78 1
            try:
79 1
                self._receive()
80 1
            except ua.utils.SocketClosedException:
81 1
                self.logger.info("Socket has closed connection")
82 1
                break
83 1
        self.logger.info("Thread ended")
84
85 1
    def _receive_header(self):
86 1
        self.logger.debug("Waiting for header")
87 1
        header = ua.Header.from_string(self._socket)
88 1
        self.logger.info("received header: %s", header)
89 1
        return header
90
91 1
    def _receive_body(self, size):
92 1
        self.logger.debug("reading body of message (%s bytes)", size)
93 1
        data = self._socket.read(size)
94 1
        if size != len(data):
95
            raise Exception("Error, did not receive expected number of bytes, got {}, asked for {}".format(len(data), size))
96 1
        return utils.Buffer(data)
97
98 1
    def _receive(self):
99 1
        body_chunk = b""
100 1
        while True:
101 1
            ret = self._receive_complete_msg()
102 1
            if ret is None:
103 1
                return
104 1
            hdr, algohdr, seqhdr, body = ret 
105 1
            if hdr.ChunkType in (b"F", b"A"):
106 1
                body.data = body_chunk + body.data
107 1
                break
108
            elif hdr.ChunkType == b"C":
109
                self.logger.debug("Received an intermediate message with header %s, waiting for next message", hdr)
110
                body_chunk += body.data
111
            else:
112
                self.logger.warning("Received a message with unknown ChunkType %s, in header %s", hdr.ChunkType, hdr)
113
                return
114 1
        self._call_callback(seqhdr.RequestId, body)
115
116 1
    def _receive_complete_msg(self):
117 1
        header = self._receive_header()
118 1
        if header is None:
119
            return None
120 1
        body = self._receive_body(header.body_size)
121 1
        if header.MessageType == ua.MessageType.Error:
122
            self.logger.warning("Received an error message type")
123
            err = ua.ErrorMessage.from_binary(body)
124
            self.logger.warning(err)
125
            return None
126 1
        if header.MessageType == ua.MessageType.Acknowledge:
127 1
            self._call_callback(0, body)
128 1
            return None
129 1
        elif header.MessageType == ua.MessageType.SecureOpen:
130 1
            algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
131 1
            self.logger.info(algohdr)
132 1
        elif header.MessageType == ua.MessageType.SecureMessage:
133 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
134 1
            self.logger.info(algohdr)
135
        else:
136
            self.logger.warning("Unsupported message type: %s", header.MessageType)
137
            return None
138 1
        seqhdr = ua.SequenceHeader.from_binary(body)
139 1
        self.logger.debug(seqhdr)
140 1
        return header, algohdr, seqhdr, body
141
142 1
    def _call_callback(self, request_id, body):
143 1
        with self._lock:
144 1
            future = self._callbackmap.pop(request_id, None)
145 1
            if future is None:
146
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
147 1
        future.set_result(body)
148
149 1
    def _write_socket(self, hdr, *args):
150 1
        alle = []
151 1
        for arg in args:
152 1
            data = arg.to_binary()
153 1
            hdr.add_size(len(data))
154 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
155 1
            self.logger.debug("struct: %s", arg)
156 1
            self.logger.debug("data: %s", data)
157 1
            alle.append(data)
158 1
        alle.insert(0, hdr.to_binary())
159 1
        alle = b"".join(alle)
160 1
        self._socket.write(alle)
161
162 1
    def _create_request_header(self, timeout=1000):
163 1
        hdr = ua.RequestHeader()
164 1
        hdr.AuthenticationToken = self.authentication_token
165 1
        self._request_handle += 1
166 1
        hdr.RequestHandle = self._request_handle
167 1
        hdr.TimeoutHint = timeout
168 1
        return hdr
169
170 1
    def _create_sym_algo_header(self):
171 1
        hdr = ua.SymmetricAlgorithmHeader()
172 1
        hdr.TokenId = self._security_token.TokenId
173 1
        return hdr
174
175 1
    def _create_sequence_header(self):
176 1
        hdr = ua.SequenceHeader()
177 1
        self._sequence_number += 1
178 1
        hdr.SequenceNumber = self._sequence_number
179 1
        self._request_id += 1
180 1
        hdr.RequestId = self._request_id
181 1
        return hdr
182
183 1
    def connect_socket(self, host, port):
184
        """
185
        connect to server socket and start receiving thread
186
        """
187 1
        self.logger.info("opening connection")
188 1
        sock = socket.create_connection((host, port))
189 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
190 1
        self._socket = utils.SocketWrapper(sock)
191 1
        self.start()
192
193 1
    def disconnect_socket(self):
194 1
        self.logger.info("stop request")
195 1
        self._do_stop = True
196 1
        self._socket.socket.shutdown(socket.SHUT_WR)
197
198 1
    def send_hello(self, url):
199 1
        hello = ua.Hello()
200 1
        hello.EndpointUrl = url
201 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
202 1
        future = Future()
203 1
        with self._lock:
204 1
            self._callbackmap[0] = future
205 1
        self._write_socket(header, hello)
206 1
        return ua.Acknowledge.from_binary(future.result(self.timeout))
207
208 1
    def open_secure_channel(self, params):
209 1
        self.logger.info("open_secure_channel")
210 1
        with self._lock:
211 1
            request = ua.OpenSecureChannelRequest()
212 1
            request.Parameters = params
213 1
            request.RequestHeader = self._create_request_header()
214
215 1
            hdr = ua.Header(ua.MessageType.SecureOpen, ua.ChunkType.Single, self._security_token.ChannelId)
216 1
            asymhdr = ua.AsymmetricAlgorithmHeader()
217 1
            seqhdr = self._create_sequence_header()
218
219 1
            future = Future()
220 1
            self._callbackmap[seqhdr.RequestId] = future
221 1
            self._write_socket(hdr, asymhdr, seqhdr, request)
222
223 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
224 1
        response.ResponseHeader.ServiceResult.check()
225 1
        self._security_token = response.Parameters.SecurityToken
226 1
        return response.Parameters
227
228 1
    def close_secure_channel(self):
229
        """
230
        close secure channel. It seems to trigger a shutdown of socket
231
        in most servers, so be prepare to reconnect
232
        """
233 1
        self.logger.info("get_endpoint")
234 1
        with self._lock:
235 1
            request = ua.CloseSecureChannelRequest()
236 1
            request.RequestHeader = self._create_request_header()
237
238 1
            hdr = ua.Header(ua.MessageType.SecureClose, ua.ChunkType.Single, self._security_token.ChannelId)
239 1
            symhdr = self._create_sym_algo_header()
240 1
            seqhdr = self._create_sequence_header()
241 1
            self._write_socket(hdr, symhdr, seqhdr, request)
242
243
        # some servers send a response here, most do not ... so we ignore
244
245
246
247 1
class BinaryClient(object):
248
249
    """
250
    low level OPC-UA client.
251
    implement all(well..one day) methods defined in opcua spec
252
    taking in argument the structures defined in opcua spec
253
    in python most of the structures are defined in
254
    uaprotocol_auto.py and uaprotocol_hand.py
255
    """
256
257 1
    def __init__(self, timeout=1):
258 1
        self.logger = logging.getLogger(__name__)
259 1
        self._publishcallbacks = {}
260 1
        self._lock = Lock()
261 1
        self._uasocket = UASocketClient(timeout)
262
263 1
    def connect_socket(self, host, port):
264
        """
265
        connect to server socket and start receiving thread
266
        """
267 1
        return self._uasocket.connect_socket(host, port)
268
269 1
    def disconnect_socket(self):
270 1
        return self._uasocket.disconnect_socket()
271
272 1
    def send_hello(self, url):
273 1
        return self._uasocket.send_hello(url)
274
275 1
    def open_secure_channel(self, params):
276 1
        return self._uasocket.open_secure_channel(params)
277
278 1
    def close_secure_channel(self):
279
        """
280
        close secure channel. It seems to trigger a shutdown of socket
281
        in most servers, so be prepare to reconnect
282
        """
283 1
        return self._uasocket.close_secure_channel()
284
285 1
    def create_session(self, parameters):
286 1
        self.logger.info("create_session")
287 1
        request = ua.CreateSessionRequest()
288 1
        request.Parameters = parameters
289 1
        data = self._uasocket.send_request(request)
290 1
        response = ua.CreateSessionResponse.from_binary(data)
291 1
        response.ResponseHeader.ServiceResult.check()
292 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
293 1
        return response.Parameters
294
295 1
    def activate_session(self, parameters):
296 1
        self.logger.info("activate_session")
297 1
        request = ua.ActivateSessionRequest()
298 1
        request.Parameters = parameters
299 1
        data = self._uasocket.send_request(request)
300 1
        response = ua.ActivateSessionResponse.from_binary(data)
301 1
        response.ResponseHeader.ServiceResult.check()
302 1
        return response.Parameters
303
304 1
    def close_session(self, deletesubscriptions):
305 1
        self.logger.info("close_session")
306 1
        request = ua.CloseSessionRequest()
307 1
        request.DeleteSubscriptions = deletesubscriptions
308 1
        data = self._uasocket.send_request(request)
309 1
        ua.CloseSessionResponse.from_binary(data)
310
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
311
312 1
    def browse(self, parameters):
313 1
        self.logger.info("browse")
314 1
        request = ua.BrowseRequest()
315 1
        request.Parameters = parameters
316 1
        data = self._uasocket.send_request(request)
317 1
        response = ua.BrowseResponse.from_binary(data)
318 1
        response.ResponseHeader.ServiceResult.check()
319 1
        return response.Results
320
321 1
    def read(self, parameters):
322 1
        self.logger.info("read")
323 1
        request = ua.ReadRequest()
324 1
        request.Parameters = parameters
325 1
        data = self._uasocket.send_request(request)
326 1
        response = ua.ReadResponse.from_binary(data)
327 1
        response.ResponseHeader.ServiceResult.check()
328 1
        return response.Results
329
330 1
    def write(self, params):
331 1
        self.logger.info("read")
332 1
        request = ua.WriteRequest()
333 1
        request.Parameters = params
334 1
        data = self._uasocket.send_request(request)
335 1
        response = ua.WriteResponse.from_binary(data)
336 1
        response.ResponseHeader.ServiceResult.check()
337 1
        return response.Results
338
339 1
    def get_endpoints(self, params):
340 1
        self.logger.info("get_endpoint")
341 1
        request = ua.GetEndpointsRequest()
342 1
        request.Parameters = params
343 1
        data = self._uasocket.send_request(request)
344 1
        response = ua.GetEndpointsResponse.from_binary(data)
345 1
        response.ResponseHeader.ServiceResult.check()
346 1
        return response.Endpoints
347
348 1
    def find_servers(self, params):
349
        self.logger.info("find_servers")
350
        request = ua.FindServersRequest()
351
        request.Parameters = params
352
        data = self._uasocket.send_request(request)
353
        response = ua.FindServersResponse.from_binary(data)
354
        response.ResponseHeader.ServiceResult.check()
355
        return response.Servers
356
357 1
    def find_servers_on_network(self, params):
358
        self.logger.info("find_servers_on_network")
359
        request = ua.FindServersOnNetworkRequest()
360
        request.Parameters = params
361
        data = self._uasocket.send_request(request)
362
        response = ua.FindServersOnNetworkResponse.from_binary(data)
363
        response.ResponseHeader.ServiceResult.check()
364
        return response.Servers
365
366 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
367 1
        self.logger.info("translate_browsepath_to_nodeid")
368 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
369 1
        request.Parameters.BrowsePaths = browsepaths
370 1
        data = self._uasocket.send_request(request)
371 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
372 1
        response.ResponseHeader.ServiceResult.check()
373 1
        return response.Results
374
375 1
    def create_subscription(self, params, callback):
376 1
        self.logger.info("create_subscription")
377 1
        request = ua.CreateSubscriptionRequest()
378 1
        request.Parameters = params
379 1
        data = self._uasocket.send_request(request)
380 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
381 1
        response.ResponseHeader.ServiceResult.check()
382 1
        with self._lock:
383 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
384 1
        return response.Parameters
385
386 1
    def delete_subscriptions(self, subscriptionids):
387 1
        self.logger.info("delete_subscription")
388 1
        request = ua.DeleteSubscriptionsRequest()
389 1
        request.Parameters.SubscriptionIds = subscriptionids
390 1
        data = self._uasocket.send_request(request)
391 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
392 1
        response.ResponseHeader.ServiceResult.check()
393 1
        for sid in subscriptionids:
394 1
            with self._lock:
395 1
                self._publishcallbacks.pop(sid)
396 1
        return response.Results
397
398 1
    def publish(self, acks=None):
399 1
        self.logger.info("publish")
400 1
        if acks is None:
401 1
            acks = []
402 1
        request = ua.PublishRequest()
403 1
        request.Parameters.SubscriptionAcknowledgements = acks
404 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
405
406 1
    def _call_publish_callback(self, future):
407 1
        self.logger.info("call_publish_callback")
408 1
        data = future.result()
409 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
410 1
        response = ua.PublishResponse.from_binary(data)
411 1
        with self._lock:
412 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
413 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
414 1
                return
415 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
416 1
        try:
417 1
            callback(response.Parameters)
418
        except Exception:  # we call client code, catch everything!
419
            self.logger.exception("Exception while calling user callback: %s")
420
421 1
    def create_monitored_items(self, params):
422 1
        self.logger.info("create_monitored_items")
423 1
        request = ua.CreateMonitoredItemsRequest()
424 1
        request.Parameters = params
425 1
        data = self._uasocket.send_request(request)
426 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
427 1
        response.ResponseHeader.ServiceResult.check()
428 1
        return response.Results
429
430 1
    def delete_monitored_items(self, params):
431 1
        self.logger.info("delete_monitored_items")
432 1
        request = ua.DeleteMonitoredItemsRequest()
433 1
        request.Parameters = params
434 1
        data = self._uasocket.send_request(request)
435 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
436 1
        response.ResponseHeader.ServiceResult.check()
437 1
        return response.Results
438
439 1
    def add_nodes(self, nodestoadd):
440 1
        self.logger.info("add_nodes")
441 1
        request = ua.AddNodesRequest()
442 1
        request.Parameters.NodesToAdd = nodestoadd
443 1
        data = self._uasocket.send_request(request)
444 1
        response = ua.AddNodesResponse.from_binary(data)
445 1
        response.ResponseHeader.ServiceResult.check()
446 1
        return response.Results
447
448 1
    def call(self, methodstocall):
449 1
        request = ua.CallRequest()
450 1
        request.Parameters.MethodsToCall = methodstocall
451 1
        data = self._uasocket.send_request(request)
452 1
        response = ua.CallResponse.from_binary(data)
453 1
        response.ResponseHeader.ServiceResult.check()
454 1
        return response.Results
455
456 1
    def history_read(self, params):
457
        self.logger.info("history_read")
458
        request = ua.HistoryReadRequest()
459
        request.Parameters = params
460
        data = self._uasocket.send_request(request)
461
        response = ua.HistoryReadResponse.from_binary(data)
462
        response.ResponseHeader.ServiceResult.check()
463
        return response.Results
464