Completed
Push — master ( a80a7b...be43b4 )
by Olivier
02:17
created

opcua.UASocketClient   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 187
Duplicated Lines 0 %

Test Coverage

Coverage 91.91%
Metric Value
wmc 33
dl 0
loc 187
ccs 125
cts 136
cp 0.9191
rs 9.4

15 Methods

Rating   Name   Duplication   Size   Complexity  
A send_request() 0 11 2
A check_answer() 0 9 2
A __init__() 0 15 1
A open_secure_channel() 0 10 1
A _create_request_header() 0 7 1
A _write_socket() 0 12 2
A connect_socket() 0 9 1
A send_hello() 0 11 2
A start() 0 8 1
B _send_request() 0 29 5
B _receive() 0 15 6
A close_secure_channel() 0 13 2
A disconnect_socket() 0 4 1
A _run() 0 9 3
A _call_callback() 0 6 3
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13 1
class UASocketClient(object):
14
    """
15
    handle socket connection and send ua messages
16
    timeout is the timeout used while waiting for an ua answer from server
17
    """
18 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
19 1
        self.logger = logging.getLogger(__name__ + "Socket")
20 1
        self._thread = None
21 1
        self._lock = Lock()
22 1
        self.timeout = timeout
23 1
        self._socket = None
24 1
        self._do_stop = False
25 1
        self._security_token = ua.ChannelSecurityToken()
26 1
        self.authentication_token = ua.NodeId()
27 1
        self._sequence_number = 0
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._security_policy = security_policy
32 1
        self._max_chunk_size = 65536
33
34 1
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39
        """
40 1
        self._thread = Thread(target=self._run)
41 1
        self._thread.start()
42
43 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            try:
52 1
                binreq = request.to_binary()
53
            except:
54
                # reset reqeust handle if any error
55
                # see self._create_request_header
56
                self._request_handle -= 1
57
                raise
58 1
            self._request_id += 1
59 1
            future = Future()
60 1
            if callback:
61 1
                future.add_done_callback(callback)
62 1
            self._callbackmap[self._request_id] = future
63 1
            for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, binreq, self._max_chunk_size,
64
                    message_type=message_type,
65
                    channel_id=self._security_token.ChannelId,
66
                    request_id=self._request_id,
67
                    token_id=self._security_token.TokenId):
68 1
                self._sequence_number += 1
69 1
                chunk.SequenceHeader.SequenceNumber = self._sequence_number
70 1
                self._socket.write(chunk.to_binary())
71 1
        return future
72
73 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
74
        """
75
        send request to server.
76
        timeout is the timeout written in ua header
77
        returns response object if no callback is provided
78
        """
79 1
        future = self._send_request(request, callback, timeout, message_type)
80 1
        if not callback:
81 1
            data = future.result(self.timeout)
82 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
83 1
            return data
84
85 1
    def check_answer(self, data, context):
86 1
        data = data.copy(50)  # FIXME check max length nodeid + responseheader
87 1
        typeid = ua.NodeId.from_binary(data)
88 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
89
            self.logger.warning("ServiceFault from server received %s", context)
90
            hdr = ua.ResponseHeader.from_binary(data)
91
            hdr.ServiceResult.check()
92
            return False
93 1
        return True
94
95 1
    def _run(self):
96 1
        self.logger.info("Thread started")
97 1
        while not self._do_stop:
98 1
            try:
99 1
                self._receive()
100 1
            except ua.utils.SocketClosedException:
101 1
                self.logger.info("Socket has closed connection")
102 1
                break
103 1
        self.logger.info("Thread ended")
104
105 1
    def _receive(self):
106 1
        msg = ua.tcp_message_from_socket(self._security_policy, self._socket)
107 1
        if isinstance(msg, ua.MessageChunk):
108 1
            chunks = [msg]
109
            # TODO: check everything
110 1
            while chunks[-1].MessageHeader.ChunkType == ua.ChunkType.Intermediate:
111
                chunks.append(ua.tcp_message_from_socket(self._security_policy, self._socket))
112 1
            body = b"".join([c.Body for c in chunks])
113 1
            self._call_callback(msg.SequenceHeader.RequestId, utils.Buffer(body))
114 1
        elif isinstance(msg, ua.Acknowledge):
115 1
            self._call_callback(0, msg)
116
        elif isinstance(msg, ua.ErrorMessage):
117
            self.logger.warning("Received an error: {}".format(msg))
118
        else:
119
            raise Exception("Unsupported message type: {}".format(msg))
120
121 1
    def _call_callback(self, request_id, body):
122 1
        with self._lock:
123 1
            future = self._callbackmap.pop(request_id, None)
124 1
            if future is None:
125 1
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
126 1
        future.set_result(body)
127
128 1
    def _write_socket(self, hdr, *args):
129 1
        alle = []
130 1
        for arg in args:
131 1
            data = arg.to_binary()
132 1
            hdr.add_size(len(data))
133 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
134 1
            self.logger.debug("struct: %s", arg)
135 1
            self.logger.debug("data: %s", data)
136 1
            alle.append(data)
137 1
        alle.insert(0, hdr.to_binary())
138 1
        alle = b"".join(alle)
139 1
        self._socket.write(alle)
140
141 1
    def _create_request_header(self, timeout=1000):
142 1
        hdr = ua.RequestHeader()
143 1
        hdr.AuthenticationToken = self.authentication_token
144 1
        self._request_handle += 1
145 1
        hdr.RequestHandle = self._request_handle
146 1
        hdr.TimeoutHint = timeout
147 1
        return hdr
148
149 1
    def connect_socket(self, host, port):
150
        """
151
        connect to server socket and start receiving thread
152
        """
153 1
        self.logger.info("opening connection")
154 1
        sock = socket.create_connection((host, port))
155 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
156 1
        self._socket = utils.SocketWrapper(sock)
157 1
        self.start()
158
159 1
    def disconnect_socket(self):
160 1
        self.logger.info("stop request")
161 1
        self._do_stop = True
162 1
        self._socket.socket.shutdown(socket.SHUT_WR)
163
164 1
    def send_hello(self, url):
165 1
        hello = ua.Hello()
166 1
        hello.EndpointUrl = url
167 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
168 1
        future = Future()
169 1
        with self._lock:
170 1
            self._callbackmap[0] = future
171 1
        self._write_socket(header, hello)
172 1
        ack = future.result(self.timeout)
173 1
        self._max_chunk_size = ack.SendBufferSize	# client shouldn't send chunks larger than this
174 1
        return ack
175
176 1
    def open_secure_channel(self, params):
177 1
        self.logger.info("open_secure_channel")
178 1
        request = ua.OpenSecureChannelRequest()
179 1
        request.Parameters = params
180 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
181
182 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
183 1
        response.ResponseHeader.ServiceResult.check()
184 1
        self._security_token = response.Parameters.SecurityToken
185 1
        return response.Parameters
186
187 1
    def close_secure_channel(self):
188
        """
189
        close secure channel. It seems to trigger a shutdown of socket
190
        in most servers, so be prepare to reconnect.
191
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
192
        """
193 1
        self.logger.info("close_secure_channel")
194 1
        request = ua.CloseSecureChannelRequest()
195 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
196 1
        with self._lock:
197
            # don't expect any more answers
198 1
            future.cancel()
199 1
            self._callbackmap.clear()
200
201
        # some servers send a response here, most do not ... so we ignore
202
203
204
205 1
class BinaryClient(object):
206
207
    """
208
    low level OPC-UA client.
209
    implement all(well..one day) methods defined in opcua spec
210
    taking in argument the structures defined in opcua spec
211
    in python most of the structures are defined in
212
    uaprotocol_auto.py and uaprotocol_hand.py
213
    """
214
215 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
216 1
        self.logger = logging.getLogger(__name__)
217 1
        self._publishcallbacks = {}
218 1
        self._lock = Lock()
219 1
        self._timeout = timeout
220 1
        self._uasocket = None
221 1
        self._security_policy = security_policy
222
223 1
    def connect_socket(self, host, port):
224
        """
225
        connect to server socket and start receiving thread
226
        """
227 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
228 1
        return self._uasocket.connect_socket(host, port)
229
230 1
    def disconnect_socket(self):
231 1
        return self._uasocket.disconnect_socket()
232
233 1
    def send_hello(self, url):
234 1
        return self._uasocket.send_hello(url)
235
236 1
    def open_secure_channel(self, params):
237 1
        return self._uasocket.open_secure_channel(params)
238
239 1
    def close_secure_channel(self):
240
        """
241
        close secure channel. It seems to trigger a shutdown of socket
242
        in most servers, so be prepare to reconnect
243
        """
244 1
        return self._uasocket.close_secure_channel()
245
246 1
    def create_session(self, parameters):
247 1
        self.logger.info("create_session")
248 1
        request = ua.CreateSessionRequest()
249 1
        request.Parameters = parameters
250 1
        data = self._uasocket.send_request(request)
251 1
        response = ua.CreateSessionResponse.from_binary(data)
252 1
        response.ResponseHeader.ServiceResult.check()
253 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
254 1
        return response.Parameters
255
256 1
    def activate_session(self, parameters):
257 1
        self.logger.info("activate_session")
258 1
        request = ua.ActivateSessionRequest()
259 1
        request.Parameters = parameters
260 1
        data = self._uasocket.send_request(request)
261 1
        response = ua.ActivateSessionResponse.from_binary(data)
262 1
        response.ResponseHeader.ServiceResult.check()
263 1
        return response.Parameters
264
265 1
    def close_session(self, deletesubscriptions):
266 1
        self.logger.info("close_session")
267 1
        request = ua.CloseSessionRequest()
268 1
        request.DeleteSubscriptions = deletesubscriptions
269 1
        data = self._uasocket.send_request(request)
270 1
        ua.CloseSessionResponse.from_binary(data)
271
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
272
273 1
    def browse(self, parameters):
274 1
        self.logger.info("browse")
275 1
        request = ua.BrowseRequest()
276 1
        request.Parameters = parameters
277 1
        data = self._uasocket.send_request(request)
278 1
        response = ua.BrowseResponse.from_binary(data)
279 1
        response.ResponseHeader.ServiceResult.check()
280 1
        return response.Results
281
282 1
    def read(self, parameters):
283 1
        self.logger.info("read")
284 1
        request = ua.ReadRequest()
285 1
        request.Parameters = parameters
286 1
        data = self._uasocket.send_request(request)
287 1
        response = ua.ReadResponse.from_binary(data)
288 1
        response.ResponseHeader.ServiceResult.check()
289 1
        return response.Results
290
291 1
    def write(self, params):
292 1
        self.logger.info("read")
293 1
        request = ua.WriteRequest()
294 1
        request.Parameters = params
295 1
        data = self._uasocket.send_request(request)
296 1
        response = ua.WriteResponse.from_binary(data)
297 1
        response.ResponseHeader.ServiceResult.check()
298 1
        return response.Results
299
300 1
    def get_endpoints(self, params):
301 1
        self.logger.info("get_endpoint")
302 1
        request = ua.GetEndpointsRequest()
303 1
        request.Parameters = params
304 1
        data = self._uasocket.send_request(request)
305 1
        response = ua.GetEndpointsResponse.from_binary(data)
306 1
        response.ResponseHeader.ServiceResult.check()
307 1
        return response.Endpoints
308
309 1
    def find_servers(self, params):
310 1
        self.logger.info("find_servers")
311 1
        request = ua.FindServersRequest()
312 1
        request.Parameters = params
313 1
        data = self._uasocket.send_request(request)
314 1
        response = ua.FindServersResponse.from_binary(data)
315 1
        response.ResponseHeader.ServiceResult.check()
316 1
        return response.Servers
317
318 1
    def find_servers_on_network(self, params):
319
        self.logger.info("find_servers_on_network")
320
        request = ua.FindServersOnNetworkRequest()
321
        request.Parameters = params
322
        data = self._uasocket.send_request(request)
323
        response = ua.FindServersOnNetworkResponse.from_binary(data)
324
        response.ResponseHeader.ServiceResult.check()
325
        return response.Parameters
326
327 1
    def register_server(self, registered_server):
328 1
        self.logger.info("register_server")
329 1
        request = ua.RegisterServerRequest()
330 1
        request.Server = registered_server
331 1
        data = self._uasocket.send_request(request)
332 1
        response = ua.RegisterServerResponse.from_binary(data)
333 1
        response.ResponseHeader.ServiceResult.check()
334
        # nothing to return for this service
335
336 1
    def register_server2(self, params):
337
        self.logger.info("register_server2")
338
        request = ua.RegisterServer2Request()
339
        request.Parameters = params
340
        data = self._uasocket.send_request(request)
341
        response = ua.RegisterServer2Response.from_binary(data)
342
        response.ResponseHeader.ServiceResult.check()
343
        return response.ConfigurationResults
344
345 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
346 1
        self.logger.info("translate_browsepath_to_nodeid")
347 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
348 1
        request.Parameters.BrowsePaths = browsepaths
349 1
        data = self._uasocket.send_request(request)
350 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
351 1
        response.ResponseHeader.ServiceResult.check()
352 1
        return response.Results
353
354 1
    def create_subscription(self, params, callback):
355 1
        self.logger.info("create_subscription")
356 1
        request = ua.CreateSubscriptionRequest()
357 1
        request.Parameters = params
358 1
        data = self._uasocket.send_request(request)
359 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
360 1
        response.ResponseHeader.ServiceResult.check()
361 1
        with self._lock:
362 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
363 1
        return response.Parameters
364
365 1
    def delete_subscriptions(self, subscriptionids):
366 1
        self.logger.info("delete_subscription")
367 1
        request = ua.DeleteSubscriptionsRequest()
368 1
        request.Parameters.SubscriptionIds = subscriptionids
369 1
        data = self._uasocket.send_request(request)
370 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
371 1
        response.ResponseHeader.ServiceResult.check()
372 1
        for sid in subscriptionids:
373 1
            with self._lock:
374 1
                self._publishcallbacks.pop(sid)
375 1
        return response.Results
376
377 1
    def publish(self, acks=None):
378 1
        self.logger.info("publish")
379 1
        if acks is None:
380 1
            acks = []
381 1
        request = ua.PublishRequest()
382 1
        request.Parameters.SubscriptionAcknowledgements = acks
383 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
384
385 1
    def _call_publish_callback(self, future):
386 1
        self.logger.info("call_publish_callback")
387 1
        data = future.result()
388 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
389 1
        response = ua.PublishResponse.from_binary(data)
390 1
        with self._lock:
391 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
392 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
393 1
                return
394 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
395 1
        try:
396 1
            callback(response.Parameters)
397
        except Exception:  # we call client code, catch everything!
398
            self.logger.exception("Exception while calling user callback: %s")
399
400 1
    def create_monitored_items(self, params):
401 1
        self.logger.info("create_monitored_items")
402 1
        request = ua.CreateMonitoredItemsRequest()
403 1
        request.Parameters = params
404 1
        data = self._uasocket.send_request(request)
405 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
406 1
        response.ResponseHeader.ServiceResult.check()
407 1
        return response.Results
408
409 1
    def delete_monitored_items(self, params):
410 1
        self.logger.info("delete_monitored_items")
411 1
        request = ua.DeleteMonitoredItemsRequest()
412 1
        request.Parameters = params
413 1
        data = self._uasocket.send_request(request)
414 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
415 1
        response.ResponseHeader.ServiceResult.check()
416 1
        return response.Results
417
418 1
    def add_nodes(self, nodestoadd):
419 1
        self.logger.info("add_nodes")
420 1
        request = ua.AddNodesRequest()
421 1
        request.Parameters.NodesToAdd = nodestoadd
422 1
        data = self._uasocket.send_request(request)
423 1
        response = ua.AddNodesResponse.from_binary(data)
424 1
        response.ResponseHeader.ServiceResult.check()
425 1
        return response.Results
426
427 1
    def call(self, methodstocall):
428 1
        request = ua.CallRequest()
429 1
        request.Parameters.MethodsToCall = methodstocall
430 1
        data = self._uasocket.send_request(request)
431 1
        response = ua.CallResponse.from_binary(data)
432 1
        response.ResponseHeader.ServiceResult.check()
433 1
        return response.Results
434
435 1
    def history_read(self, params):
436
        self.logger.info("history_read")
437
        request = ua.HistoryReadRequest()
438
        request.Parameters = params
439
        data = self._uasocket.send_request(request)
440
        response = ua.HistoryReadResponse.from_binary(data)
441
        response.ResponseHeader.ServiceResult.check()
442
        return response.Results
443