Completed
Push — dev ( 4b39c9 )
by Olivier
06:48
created

opcua.UASocketClient._call_callback()   A

Complexity

Conditions 3

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3
Metric Value
dl 0
loc 6
ccs 6
cts 6
cp 1
rs 9.4286
cc 3
crap 3
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13 1
class UASocketClient(object):
14
    """
15
    handle socket connection and send ua messages
16
    timeout is the timeout used while waiting for an ua answer from server
17
    """
18 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
19 1
        self.logger = logging.getLogger(__name__ + "Socket")
20 1
        self._thread = None
21 1
        self._lock = Lock()
22 1
        self.timeout = timeout
23 1
        self._socket = None
24 1
        self._do_stop = False
25 1
        self._security_token = ua.ChannelSecurityToken()
26 1
        self.authentication_token = ua.NodeId()
27 1
        self._sequence_number = 0
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._security_policy = security_policy
32 1
        self._max_chunk_size = 65536
33
34 1
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39
        """
40 1
        self._thread = Thread(target=self._run)
41 1
        self._thread.start()
42
43 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            try:
52 1
                binreq = request.to_binary()
53
            except:
54
                # reset reqeust handle if any error
55
                # see self._create_request_header
56
                self._request_handle -= 1
57
                raise
58 1
            self._request_id += 1
59 1
            future = Future()
60 1
            if callback:
61 1
                future.add_done_callback(callback)
62 1
            self._callbackmap[self._request_id] = future
63 1
            for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, binreq, self._max_chunk_size,
64
                    message_type=message_type,
65
                    channel_id=self._security_token.ChannelId,
66
                    request_id=self._request_id,
67
                    token_id=self._security_token.TokenId):
68 1
                self._sequence_number += 1
69 1
                chunk.SequenceHeader.SequenceNumber = self._sequence_number
70 1
                self._socket.write(chunk.to_binary())
71 1
        return future
72
73 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
74
        """
75
        send request to server.
76
        timeout is the timeout written in ua header
77
        returns response object if no callback is provided
78
        """
79 1
        future = self._send_request(request, callback, timeout, message_type)
80 1
        if not callback:
81 1
            data = future.result(self.timeout)
82 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
83 1
            return data
84
85 1
    def check_answer(self, data, context):
86 1
        data = data.copy()
87 1
        typeid = ua.NodeId.from_binary(data)
88 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
89
            self.logger.warning("ServiceFault from server received %s", context)
90
            hdr = ua.ResponseHeader.from_binary(data)
91
            hdr.ServiceResult.check()
92
            return False
93 1
        return True
94
95 1
    def _run(self):
96 1
        self.logger.info("Thread started")
97 1
        while not self._do_stop:
98 1
            try:
99 1
                self._receive()
100 1
            except ua.utils.SocketClosedException:
101 1
                self.logger.info("Socket has closed connection")
102 1
                break
103 1
        self.logger.info("Thread ended")
104
105 1
    def _receive(self):
106 1
        msg = ua.tcp_message_from_socket(self._security_policy, self._socket)
107 1
        if isinstance(msg, ua.MessageChunk):
108 1
            chunks = [msg]
109
            # TODO: check everything
110 1
            while chunks[-1].MessageHeader.ChunkType == ua.ChunkType.Intermediate:
111
                chunks.append(ua.tcp_message_from_socket(self._security_policy, self._socket))
112 1
            body = b"".join([c.Body for c in chunks])
113 1
            self._call_callback(msg.SequenceHeader.RequestId, utils.Buffer(body))
114 1
        elif isinstance(msg, ua.Acknowledge):
115 1
            self._call_callback(0, msg)
116
        elif isinstance(msg, ua.ErrorMessage):
117
            self.logger.warning("Received an error: {}".format(msg))
118
        else:
119
            raise Exception("Unsupported message type: {}".format(msg))
120
121 1
    def _call_callback(self, request_id, body):
122 1
        with self._lock:
123 1
            future = self._callbackmap.pop(request_id, None)
124 1
            if future is None:
125 1
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
126 1
        future.set_result(body)
127
128 1
    def _write_socket(self, hdr, *args):
129 1
        alle = []
130 1
        for arg in args:
131 1
            data = arg.to_binary()
132 1
            hdr.add_size(len(data))
133 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
134 1
            self.logger.debug("struct: %s", arg)
135 1
            self.logger.debug("data: %s", data)
136 1
            alle.append(data)
137 1
        alle.insert(0, hdr.to_binary())
138 1
        alle = b"".join(alle)
139 1
        self._socket.write(alle)
140
141 1
    def _create_request_header(self, timeout=1000):
142 1
        hdr = ua.RequestHeader()
143 1
        hdr.AuthenticationToken = self.authentication_token
144 1
        self._request_handle += 1
145 1
        hdr.RequestHandle = self._request_handle
146 1
        hdr.TimeoutHint = timeout
147 1
        return hdr
148
149 1
    def connect_socket(self, host, port):
150
        """
151
        connect to server socket and start receiving thread
152
        """
153 1
        self.logger.info("opening connection")
154 1
        sock = socket.create_connection((host, port))
155 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
156 1
        self._socket = utils.SocketWrapper(sock)
157 1
        self.start()
158
159 1
    def disconnect_socket(self):
160 1
        self.logger.info("stop request")
161 1
        self._do_stop = True
162 1
        self._socket.socket.shutdown(socket.SHUT_WR)
163
164 1
    def send_hello(self, url):
165 1
        hello = ua.Hello()
166 1
        hello.EndpointUrl = url
167 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
168 1
        future = Future()
169 1
        with self._lock:
170 1
            self._callbackmap[0] = future
171 1
        self._write_socket(header, hello)
172 1
        ack = future.result(self.timeout)
173 1
        self._max_chunk_size = ack.SendBufferSize	# client shouldn't send chunks larger than this
174 1
        return ack
175
176 1
    def open_secure_channel(self, params):
177 1
        self.logger.info("open_secure_channel")
178 1
        request = ua.OpenSecureChannelRequest()
179 1
        request.Parameters = params
180 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
181
182 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
183 1
        response.ResponseHeader.ServiceResult.check()
184 1
        self._security_token = response.Parameters.SecurityToken
185 1
        return response.Parameters
186
187 1
    def close_secure_channel(self):
188
        """
189
        close secure channel. It seems to trigger a shutdown of socket
190
        in most servers, so be prepare to reconnect.
191
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
192
        """
193 1
        self.logger.info("close_secure_channel")
194 1
        request = ua.CloseSecureChannelRequest()
195 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
196 1
        with self._lock:
197
            # don't expect any more answers
198 1
            future.cancel()
199 1
            self._callbackmap.clear()
200
201
        # some servers send a response here, most do not ... so we ignore
202
203
204
205 1
class BinaryClient(object):
206
207
    """
208
    low level OPC-UA client.
209
    implement all(well..one day) methods defined in opcua spec
210
    taking in argument the structures defined in opcua spec
211
    in python most of the structures are defined in
212
    uaprotocol_auto.py and uaprotocol_hand.py
213
    """
214
215 1
    def __init__(self, timeout=1):
216 1
        self.logger = logging.getLogger(__name__)
217 1
        self._publishcallbacks = {}
218 1
        self._lock = Lock()
219 1
        self._timeout = timeout
220 1
        self._uasocket = None
221 1
        self._security_policy = ua.SecurityPolicy()
222
223 1
    def set_security(self, policy):
224
        self._security_policy = policy
225
226 1
    def connect_socket(self, host, port):
227
        """
228
        connect to server socket and start receiving thread
229
        """
230 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
231 1
        return self._uasocket.connect_socket(host, port)
232
233 1
    def disconnect_socket(self):
234 1
        return self._uasocket.disconnect_socket()
235
236 1
    def send_hello(self, url):
237 1
        return self._uasocket.send_hello(url)
238
239 1
    def open_secure_channel(self, params):
240 1
        return self._uasocket.open_secure_channel(params)
241
242 1
    def close_secure_channel(self):
243
        """
244
        close secure channel. It seems to trigger a shutdown of socket
245
        in most servers, so be prepare to reconnect
246
        """
247 1
        return self._uasocket.close_secure_channel()
248
249 1
    def create_session(self, parameters):
250 1
        self.logger.info("create_session")
251 1
        request = ua.CreateSessionRequest()
252 1
        request.Parameters = parameters
253 1
        data = self._uasocket.send_request(request)
254 1
        response = ua.CreateSessionResponse.from_binary(data)
255 1
        response.ResponseHeader.ServiceResult.check()
256 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
257 1
        return response.Parameters
258
259 1
    def activate_session(self, parameters):
260 1
        self.logger.info("activate_session")
261 1
        request = ua.ActivateSessionRequest()
262 1
        request.Parameters = parameters
263 1
        data = self._uasocket.send_request(request)
264 1
        response = ua.ActivateSessionResponse.from_binary(data)
265 1
        response.ResponseHeader.ServiceResult.check()
266 1
        return response.Parameters
267
268 1
    def close_session(self, deletesubscriptions):
269 1
        self.logger.info("close_session")
270 1
        request = ua.CloseSessionRequest()
271 1
        request.DeleteSubscriptions = deletesubscriptions
272 1
        data = self._uasocket.send_request(request)
273 1
        ua.CloseSessionResponse.from_binary(data)
274
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
275
276 1
    def browse(self, parameters):
277 1
        self.logger.info("browse")
278 1
        request = ua.BrowseRequest()
279 1
        request.Parameters = parameters
280 1
        data = self._uasocket.send_request(request)
281 1
        response = ua.BrowseResponse.from_binary(data)
282 1
        response.ResponseHeader.ServiceResult.check()
283 1
        return response.Results
284
285 1
    def read(self, parameters):
286 1
        self.logger.info("read")
287 1
        request = ua.ReadRequest()
288 1
        request.Parameters = parameters
289 1
        data = self._uasocket.send_request(request)
290 1
        response = ua.ReadResponse.from_binary(data)
291 1
        response.ResponseHeader.ServiceResult.check()
292 1
        return response.Results
293
294 1
    def write(self, params):
295 1
        self.logger.info("read")
296 1
        request = ua.WriteRequest()
297 1
        request.Parameters = params
298 1
        data = self._uasocket.send_request(request)
299 1
        response = ua.WriteResponse.from_binary(data)
300 1
        response.ResponseHeader.ServiceResult.check()
301 1
        return response.Results
302
303 1
    def get_endpoints(self, params):
304 1
        self.logger.info("get_endpoint")
305 1
        request = ua.GetEndpointsRequest()
306 1
        request.Parameters = params
307 1
        data = self._uasocket.send_request(request)
308 1
        response = ua.GetEndpointsResponse.from_binary(data)
309 1
        response.ResponseHeader.ServiceResult.check()
310 1
        return response.Endpoints
311
312 1
    def find_servers(self, params):
313 1
        self.logger.info("find_servers")
314 1
        request = ua.FindServersRequest()
315 1
        request.Parameters = params
316 1
        data = self._uasocket.send_request(request)
317 1
        response = ua.FindServersResponse.from_binary(data)
318 1
        response.ResponseHeader.ServiceResult.check()
319 1
        return response.Servers
320
321 1
    def find_servers_on_network(self, params):
322
        self.logger.info("find_servers_on_network")
323
        request = ua.FindServersOnNetworkRequest()
324
        request.Parameters = params
325
        data = self._uasocket.send_request(request)
326
        response = ua.FindServersOnNetworkResponse.from_binary(data)
327
        response.ResponseHeader.ServiceResult.check()
328
        return response.Parameters
329
330 1
    def register_server(self, registered_server):
331 1
        self.logger.info("register_server")
332 1
        request = ua.RegisterServerRequest()
333 1
        request.Server = registered_server
334 1
        data = self._uasocket.send_request(request)
335 1
        response = ua.RegisterServerResponse.from_binary(data)
336 1
        response.ResponseHeader.ServiceResult.check()
337
        # nothing to return for this service
338
339 1
    def register_server2(self, params):
340
        self.logger.info("register_server2")
341
        request = ua.RegisterServer2Request()
342
        request.Parameters = params
343
        data = self._uasocket.send_request(request)
344
        response = ua.RegisterServer2Response.from_binary(data)
345
        response.ResponseHeader.ServiceResult.check()
346
        return response.ConfigurationResults
347
348 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
349 1
        self.logger.info("translate_browsepath_to_nodeid")
350 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
351 1
        request.Parameters.BrowsePaths = browsepaths
352 1
        data = self._uasocket.send_request(request)
353 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
354 1
        response.ResponseHeader.ServiceResult.check()
355 1
        return response.Results
356
357 1
    def create_subscription(self, params, callback):
358 1
        self.logger.info("create_subscription")
359 1
        request = ua.CreateSubscriptionRequest()
360 1
        request.Parameters = params
361 1
        data = self._uasocket.send_request(request)
362 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
363 1
        response.ResponseHeader.ServiceResult.check()
364 1
        with self._lock:
365 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
366 1
        return response.Parameters
367
368 1
    def delete_subscriptions(self, subscriptionids):
369 1
        self.logger.info("delete_subscription")
370 1
        request = ua.DeleteSubscriptionsRequest()
371 1
        request.Parameters.SubscriptionIds = subscriptionids
372 1
        data = self._uasocket.send_request(request)
373 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
374 1
        response.ResponseHeader.ServiceResult.check()
375 1
        for sid in subscriptionids:
376 1
            with self._lock:
377 1
                self._publishcallbacks.pop(sid)
378 1
        return response.Results
379
380 1
    def publish(self, acks=None):
381 1
        self.logger.info("publish")
382 1
        if acks is None:
383 1
            acks = []
384 1
        request = ua.PublishRequest()
385 1
        request.Parameters.SubscriptionAcknowledgements = acks
386 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
387
388 1
    def _call_publish_callback(self, future):
389 1
        self.logger.info("call_publish_callback")
390 1
        data = future.result()
391 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
392 1
        response = ua.PublishResponse.from_binary(data)
393 1
        with self._lock:
394 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
395 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
396 1
                return
397 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
398 1
        try:
399 1
            callback(response.Parameters)
400
        except Exception:  # we call client code, catch everything!
401
            self.logger.exception("Exception while calling user callback: %s")
402
403 1
    def create_monitored_items(self, params):
404 1
        self.logger.info("create_monitored_items")
405 1
        request = ua.CreateMonitoredItemsRequest()
406 1
        request.Parameters = params
407 1
        data = self._uasocket.send_request(request)
408 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
409 1
        response.ResponseHeader.ServiceResult.check()
410 1
        return response.Results
411
412 1
    def delete_monitored_items(self, params):
413 1
        self.logger.info("delete_monitored_items")
414 1
        request = ua.DeleteMonitoredItemsRequest()
415 1
        request.Parameters = params
416 1
        data = self._uasocket.send_request(request)
417 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
418 1
        response.ResponseHeader.ServiceResult.check()
419 1
        return response.Results
420
421 1
    def add_nodes(self, nodestoadd):
422 1
        self.logger.info("add_nodes")
423 1
        request = ua.AddNodesRequest()
424 1
        request.Parameters.NodesToAdd = nodestoadd
425 1
        data = self._uasocket.send_request(request)
426 1
        response = ua.AddNodesResponse.from_binary(data)
427 1
        response.ResponseHeader.ServiceResult.check()
428 1
        return response.Results
429
430 1
    def call(self, methodstocall):
431 1
        request = ua.CallRequest()
432 1
        request.Parameters.MethodsToCall = methodstocall
433 1
        data = self._uasocket.send_request(request)
434 1
        response = ua.CallResponse.from_binary(data)
435 1
        response.ResponseHeader.ServiceResult.check()
436 1
        return response.Results
437
438 1
    def history_read(self, params):
439
        self.logger.info("history_read")
440
        request = ua.HistoryReadRequest()
441
        request.Parameters = params
442
        data = self._uasocket.send_request(request)
443
        response = ua.HistoryReadResponse.from_binary(data)
444
        response.ResponseHeader.ServiceResult.check()
445
        return response.Results
446