Passed
Pull Request — master (#589)
by
unknown
05:46
created

UASocketClient._create_request_header()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 7
ccs 7
cts 7
cp 1
crap 1
rs 9.4285
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 1
from opcua.common.connection import SecureConnection
15
16
17 1
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20
    timeout is the timeout used while waiting for an ua answer from server
21
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23 1
        self.logger = logging.getLogger(__name__ + ".Socket")
24 1
        self._thread = None
25 1
        self._lock = Lock()
26 1
        self.timeout = timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32 1
        self._callbackmap = {}
33 1
        self._connection = SecureConnection(security_policy)
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41 1
        self._thread = Thread(target=self._run)
42 1
        self._thread.daemon = True
43 1
        self._thread.start()
44
45 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
46
        """
47
        send request to server, lower-level method
48
        timeout is the timeout written in ua header
49
        returns future
50
        """
51 1
        with self._lock:
52 1
            request.RequestHeader = self._create_request_header(timeout)
53 1
            self.logger.debug("Sending: %s", request)
54 1
            try:
55 1
                binreq = struct_to_binary(request)
56
            except:
57
                # reset reqeust handle if any error
58
                # see self._create_request_header
59
                self._request_handle -= 1
60
                raise
61 1
            self._request_id += 1
62 1
            future = Future()
63 1
            if callback:
64 1
                future.add_done_callback(callback)
65 1
            self._callbackmap[self._request_id] = future
66 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
67 1
            self._socket.write(msg)
68 1
        return future
69
70 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
71
        """
72
        send request to server.
73
        timeout is the timeout written in ua header
74
        returns response object if no callback is provided
75
        """
76 1
        future = self._send_request(request, callback, timeout, message_type)
77 1
        if not callback:
78 1
            data = future.result(self.timeout)
79 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
80 1
            return data
81
82 1
    def check_answer(self, data, context):
83 1
        data = data.copy()
84 1
        typeid = nodeid_from_binary(data)
85 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
86 1
            self.logger.warning("ServiceFault from server received %s", context)
87 1
            hdr = struct_from_binary(ua.ResponseHeader, data)
88 1
            hdr.ServiceResult.check()
89
            return False
90 1
        return True
91
92 1
    def _run(self):
93 1
        self.logger.info("Thread started")
94 1
        while not self._do_stop:
95 1
            try:
96 1
                self._receive()
97 1
            except ua.utils.SocketClosedException:
98 1
                self.logger.info("Socket has closed connection")
99 1
                break
100
            except UaError:
101
                self.logger.exception("Protocol Error")
102 1
        self.logger.info("Thread ended")
103
104 1
    def _receive(self):
105 1
        msg = self._connection.receive_from_socket(self._socket)
106 1
        if msg is None:
107
            return
108 1
        elif isinstance(msg, ua.Message):
109 1
            self._call_callback(msg.request_id(), msg.body())
110 1
        elif isinstance(msg, ua.Acknowledge):
111 1
            self._call_callback(0, msg)
112
        elif isinstance(msg, ua.ErrorMessage):
113
            self.logger.warning("Received an error: %s", msg)
114
        else:
115
            raise ua.UaError("Unsupported message type: %s", msg)
116
117 1
    def _call_callback(self, request_id, body):
118 1
        with self._lock:
119 1
            future = self._callbackmap.pop(request_id, None)
120 1
            if future is None:
121
                raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
122 1
        future.set_result(body)
123
124 1
    def _create_request_header(self, timeout=1000):
125 1
        hdr = ua.RequestHeader()
126 1
        hdr.AuthenticationToken = self.authentication_token
127 1
        self._request_handle += 1
128 1
        hdr.RequestHandle = self._request_handle
129 1
        hdr.TimeoutHint = timeout
130 1
        return hdr
131
132 1
    def connect_socket(self, host, port):
133
        """
134
        connect to server socket and start receiving thread
135
        """
136 1
        self.logger.info("opening connection")
137 1
        sock = socket.create_connection((host, port))
138 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
139 1
        self._socket = ua.utils.SocketWrapper(sock)
140 1
        self.start()
141
142 1
    def disconnect_socket(self):
143 1
        self.logger.info("stop request")
144 1
        self._do_stop = True
145 1
        self._socket.socket.shutdown(socket.SHUT_RDWR)
146 1
        self._socket.socket.close()
147
148 1
    def send_hello(self, url, max_messagesize = 0, max_chunkcount = 0):
149 1
        hello = ua.Hello()
150 1
        hello.EndpointUrl = url
151 1
        hello.MaxMessageSize = max_messagesize
152 1
        hello.MaxChunkCount = max_chunkcount
153 1
        future = Future()
154 1
        with self._lock:
155 1
            self._callbackmap[0] = future
156 1
        binmsg = uatcp_to_binary(ua.MessageType.Hello, hello)
157 1
        self._socket.write(binmsg)
158 1
        ack = future.result(self.timeout)
159 1
        return ack
160
161 1
    def open_secure_channel(self, params):
162 1
        self.logger.info("open_secure_channel")
163 1
        request = ua.OpenSecureChannelRequest()
164 1
        request.Parameters = params
165 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
166
        
167
        # FIXME: we have a race condition here
168
        # we can get a packet with the new token id before we reach to store it..
169 1
        response = struct_from_binary(ua.OpenSecureChannelResponse, future.result(self.timeout))
170 1
        response.ResponseHeader.ServiceResult.check()
171 1
        self._connection.set_channel(response.Parameters)
172 1
        return response.Parameters
173
174 1
    def close_secure_channel(self):
175
        """
176
        close secure channel. It seems to trigger a shutdown of socket
177
        in most servers, so be prepare to reconnect.
178
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
179
        """
180 1
        self.logger.info("close_secure_channel")
181 1
        request = ua.CloseSecureChannelRequest()
182 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
183 1
        with self._lock:
184
            # don't expect any more answers
185 1
            future.cancel()
186 1
            self._callbackmap.clear()
187
188
        # some servers send a response here, most do not ... so we ignore
189
190
191 1
class UaClient(object):
192
193
    """
194
    low level OPC-UA client.
195
196
    It implements (almost) all methods defined in opcua spec
197
    taking in argument the structures defined in opcua spec.
198
199
    In this Python implementation  most of the structures are defined in
200
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
201
    """
202
203 1
    def __init__(self, timeout=1):
204 1
        self.logger = logging.getLogger(__name__)
205
        # _publishcallbacks should be accessed in recv thread only
206 1
        self._publishcallbacks = {}
207 1
        self._timeout = timeout
208 1
        self._uasocket = None
209 1
        self.security_policy = ua.SecurityPolicy()
210
211 1
    def set_security(self, policy):
212 1
        self.security_policy = policy
213
214 1
    def connect_socket(self, host, port):
215
        """
216
        connect to server socket and start receiving thread
217
        """
218 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self.security_policy)
219 1
        return self._uasocket.connect_socket(host, port)
220
221 1
    def disconnect_socket(self):
222 1
        return self._uasocket.disconnect_socket()
223
224 1
    def send_hello(self, url, max_messagesize = 0, max_chunkcount = 0):
225 1
        return self._uasocket.send_hello(url, max_messagesize, max_chunkcount)
226
227 1
    def open_secure_channel(self, params):
228 1
        return self._uasocket.open_secure_channel(params)
229
230 1
    def close_secure_channel(self):
231
        """
232
        close secure channel. It seems to trigger a shutdown of socket
233
        in most servers, so be prepare to reconnect
234
        """
235 1
        return self._uasocket.close_secure_channel()
236
237 1
    def create_session(self, parameters):
238 1
        self.logger.info("create_session")
239 1
        request = ua.CreateSessionRequest()
240 1
        request.Parameters = parameters
241 1
        data = self._uasocket.send_request(request)
242 1
        response = struct_from_binary(ua.CreateSessionResponse, data)
243 1
        self.logger.debug(response)
244 1
        response.ResponseHeader.ServiceResult.check()
245 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
246 1
        return response.Parameters
247
248 1
    def activate_session(self, parameters):
249 1
        self.logger.info("activate_session")
250 1
        request = ua.ActivateSessionRequest()
251 1
        request.Parameters = parameters
252 1
        data = self._uasocket.send_request(request)
253 1
        response = struct_from_binary(ua.ActivateSessionResponse, data)
254 1
        self.logger.debug(response)
255 1
        response.ResponseHeader.ServiceResult.check()
256 1
        return response.Parameters
257
258 1
    def close_session(self, deletesubscriptions):
259 1
        self.logger.info("close_session")
260 1
        request = ua.CloseSessionRequest()
261 1
        request.DeleteSubscriptions = deletesubscriptions
262 1
        data = self._uasocket.send_request(request)
263 1
        response = struct_from_binary(ua.CloseSessionResponse, data)
264 1
        try:
265 1
            response.ResponseHeader.ServiceResult.check()
266
        except BadSessionClosed:
267
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
268
            #          we can just ignore it therefore.
269
            #          Alternatively we could make sure that there are no publish requests in flight when
270
            #          closing the session.
271
            pass
272
273 1
    def browse(self, parameters):
274 1
        self.logger.info("browse")
275 1
        request = ua.BrowseRequest()
276 1
        request.Parameters = parameters
277 1
        data = self._uasocket.send_request(request)
278 1
        response = struct_from_binary(ua.BrowseResponse, data)
279 1
        self.logger.debug(response)
280 1
        response.ResponseHeader.ServiceResult.check()
281 1
        return response.Results
282
283 1
    def browse_next(self, parameters):
284
        self.logger.info("browse next")
285
        request = ua.BrowseNextRequest()
286
        request.Parameters = parameters
287
        data = self._uasocket.send_request(request)
288
        response = struct_from_binary(ua.BrowseNextResponse, data)
289
        self.logger.debug(response)
290
        response.ResponseHeader.ServiceResult.check()
291
        return response.Parameters.Results
292
293 1
    def read(self, parameters):
294 1
        self.logger.info("read")
295 1
        request = ua.ReadRequest()
296 1
        request.Parameters = parameters
297 1
        data = self._uasocket.send_request(request)
298 1
        response = struct_from_binary(ua.ReadResponse, data)
299 1
        self.logger.debug(response)
300 1
        response.ResponseHeader.ServiceResult.check()
301
        # cast to Enum attributes that need to
302 1
        for idx, rv in enumerate(parameters.NodesToRead):
303 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
304 1
                dv = response.Results[idx]
305 1
                if dv.StatusCode.is_good():
306 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
307 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
308 1
                dv = response.Results[idx]
309 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
310 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
311 1
        return response.Results
312
313 1
    def write(self, params):
314 1
        self.logger.info("read")
315 1
        request = ua.WriteRequest()
316 1
        request.Parameters = params
317 1
        data = self._uasocket.send_request(request)
318 1
        response = struct_from_binary(ua.WriteResponse, data)
319 1
        self.logger.debug(response)
320 1
        response.ResponseHeader.ServiceResult.check()
321 1
        return response.Results
322
323 1
    def get_endpoints(self, params):
324 1
        self.logger.info("get_endpoint")
325 1
        request = ua.GetEndpointsRequest()
326 1
        request.Parameters = params
327 1
        data = self._uasocket.send_request(request)
328 1
        response = struct_from_binary(ua.GetEndpointsResponse, data)
329 1
        self.logger.debug(response)
330 1
        response.ResponseHeader.ServiceResult.check()
331 1
        return response.Endpoints
332
333 1
    def find_servers(self, params):
334 1
        self.logger.info("find_servers")
335 1
        request = ua.FindServersRequest()
336 1
        request.Parameters = params
337 1
        data = self._uasocket.send_request(request)
338 1
        response = struct_from_binary(ua.FindServersResponse, data)
339 1
        self.logger.debug(response)
340 1
        response.ResponseHeader.ServiceResult.check()
341 1
        return response.Servers
342
343 1
    def find_servers_on_network(self, params):
344
        self.logger.info("find_servers_on_network")
345
        request = ua.FindServersOnNetworkRequest()
346
        request.Parameters = params
347
        data = self._uasocket.send_request(request)
348
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
349
        self.logger.debug(response)
350
        response.ResponseHeader.ServiceResult.check()
351
        return response.Parameters
352
353 1
    def register_server(self, registered_server):
354 1
        self.logger.info("register_server")
355 1
        request = ua.RegisterServerRequest()
356 1
        request.Server = registered_server
357 1
        data = self._uasocket.send_request(request)
358 1
        response = struct_from_binary(ua.RegisterServerResponse, data)
359 1
        self.logger.debug(response)
360 1
        response.ResponseHeader.ServiceResult.check()
361
        # nothing to return for this service
362
363 1
    def register_server2(self, params):
364
        self.logger.info("register_server2")
365
        request = ua.RegisterServer2Request()
366
        request.Parameters = params
367
        data = self._uasocket.send_request(request)
368
        response = struct_from_binary(ua.RegisterServer2Response, data)
369
        self.logger.debug(response)
370
        response.ResponseHeader.ServiceResult.check()
371
        return response.ConfigurationResults
372
373 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
374 1
        self.logger.info("translate_browsepath_to_nodeid")
375 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
376 1
        request.Parameters.BrowsePaths = browsepaths
377 1
        data = self._uasocket.send_request(request)
378 1
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
379 1
        self.logger.debug(response)
380 1
        response.ResponseHeader.ServiceResult.check()
381 1
        return response.Results
382
383 1
    def create_subscription(self, params, callback):
384 1
        self.logger.info("create_subscription")
385 1
        request = ua.CreateSubscriptionRequest()
386 1
        request.Parameters = params
387 1
        resp_fut = Future()
388 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
389 1
        self._uasocket.send_request(request, mycallbak)
390 1
        return resp_fut.result(self._timeout)
391
392 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
393 1
        self.logger.info("_create_subscription_callback")
394 1
        data = data_fut.result()
395 1
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
396 1
        self.logger.debug(response)
397 1
        response.ResponseHeader.ServiceResult.check()
398 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
399 1
        resp_fut.set_result(response.Parameters)
400
401 1
    def delete_subscriptions(self, subscriptionids):
402 1
        self.logger.info("delete_subscription")
403 1
        request = ua.DeleteSubscriptionsRequest()
404 1
        request.Parameters.SubscriptionIds = subscriptionids
405 1
        resp_fut = Future()
406 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
407 1
        self._uasocket.send_request(request, mycallbak)
408 1
        return resp_fut.result(self._timeout)
409
410 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
411 1
        self.logger.info("_delete_subscriptions_callback")
412 1
        data = data_fut.result()
413 1
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
414 1
        self.logger.debug(response)
415 1
        response.ResponseHeader.ServiceResult.check()
416 1
        for sid in subscriptionids:
417 1
            self._publishcallbacks.pop(sid)
418 1
        resp_fut.set_result(response.Results)
419
420 1
    def publish(self, acks=None):
421 1
        self.logger.info("publish")
422 1
        if acks is None:
423 1
            acks = []
424 1
        request = ua.PublishRequest()
425 1
        request.Parameters.SubscriptionAcknowledgements = acks
426 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=0)
427
428 1
    def _call_publish_callback(self, future):
429 1
        self.logger.info("call_publish_callback")
430 1
        data = future.result()
431
432
        # check if answer looks ok
433 1
        try:
434 1
            self._uasocket.check_answer(data, "while waiting for publish response")
435
        except BadTimeout: # Spec Part 4, 7.28
436
            self.publish()
437
            return
438
        except BadNoSubscription: # Spec Part 5, 13.8.1
439
            # BadNoSubscription is expected after deleting the last subscription.
440
            #
441
            # We should therefore also check for len(self._publishcallbacks) == 0, but
442
            # this gets us into trouble if a Publish response arrives before the
443
            # DeleteSubscription response.
444
            #
445
            # We could remove the callback already when sending the DeleteSubscription request,
446
            # but there are some legitimate reasons to keep them around, such as when the server
447
            # responds with "BadTimeout" and we should try again later instead of just removing
448
            # the subscription client-side.
449
            #
450
            # There are a variety of ways to act correctly, but the most practical solution seems
451
            # to be to just ignore any BadNoSubscription responses.
452
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
453
            return
454
455
        # parse publish response
456 1
        try:
457 1
            response = struct_from_binary(ua.PublishResponse, data)
458 1
            self.logger.debug(response)
459
        except Exception:
460
            # INFO: catching the exception here might be obsolete because we already
461
            #       catch BadTimeout above. However, it's not really clear what this code
462
            #       does so it stays in, doesn't seem to hurt.
463
            self.logger.exception("Error parsing notificatipn from server")
464
            self.publish([]) #send publish request ot server so he does stop sending notifications
465
            return
466
467
        # look for callback
468 1
        try:
469 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
470
        except KeyError:
471
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
472
            return
473
474
        # do callback
475 1
        try:
476 1
            callback(response.Parameters)
477
        except Exception:  # we call client code, catch everything!
478
            self.logger.exception("Exception while calling user callback: %s")
479
480 1
    def create_monitored_items(self, params):
481 1
        self.logger.info("create_monitored_items")
482 1
        request = ua.CreateMonitoredItemsRequest()
483 1
        request.Parameters = params
484 1
        data = self._uasocket.send_request(request)
485 1
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
486 1
        self.logger.debug(response)
487 1
        response.ResponseHeader.ServiceResult.check()
488 1
        return response.Results
489
490 1
    def delete_monitored_items(self, params):
491 1
        self.logger.info("delete_monitored_items")
492 1
        request = ua.DeleteMonitoredItemsRequest()
493 1
        request.Parameters = params
494 1
        data = self._uasocket.send_request(request)
495 1
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
496 1
        self.logger.debug(response)
497 1
        response.ResponseHeader.ServiceResult.check()
498 1
        return response.Results
499
500 1
    def add_nodes(self, nodestoadd):
501 1
        self.logger.info("add_nodes")
502 1
        request = ua.AddNodesRequest()
503 1
        request.Parameters.NodesToAdd = nodestoadd
504 1
        data = self._uasocket.send_request(request)
505 1
        response = struct_from_binary(ua.AddNodesResponse, data)
506 1
        self.logger.debug(response)
507 1
        response.ResponseHeader.ServiceResult.check()
508 1
        return response.Results
509
510 1
    def add_references(self, refs):
511 1
        self.logger.info("add_references")
512 1
        request = ua.AddReferencesRequest()
513 1
        request.Parameters.ReferencesToAdd = refs
514 1
        data = self._uasocket.send_request(request)
515 1
        response = struct_from_binary(ua.AddReferencesResponse, data)
516 1
        self.logger.debug(response)
517 1
        response.ResponseHeader.ServiceResult.check()
518 1
        return response.Results
519
520 1
    def delete_references(self, refs):
521 1
        self.logger.info("delete")
522 1
        request = ua.DeleteReferencesRequest()
523 1
        request.Parameters.ReferencesToDelete = refs
524 1
        data = self._uasocket.send_request(request)
525 1
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
526 1
        self.logger.debug(response)
527 1
        response.ResponseHeader.ServiceResult.check()
528 1
        return response.Parameters.Results
529
530
531 1
    def delete_nodes(self, params):
532 1
        self.logger.info("delete_nodes")
533 1
        request = ua.DeleteNodesRequest()
534 1
        request.Parameters = params
535 1
        data = self._uasocket.send_request(request)
536 1
        response = struct_from_binary(ua.DeleteNodesResponse, data)
537 1
        self.logger.debug(response)
538 1
        response.ResponseHeader.ServiceResult.check()
539 1
        return response.Results
540
541 1
    def call(self, methodstocall):
542 1
        request = ua.CallRequest()
543 1
        request.Parameters.MethodsToCall = methodstocall
544 1
        data = self._uasocket.send_request(request)
545 1
        response = struct_from_binary(ua.CallResponse, data)
546 1
        self.logger.debug(response)
547 1
        response.ResponseHeader.ServiceResult.check()
548 1
        return response.Results
549
550 1
    def history_read(self, params):
551
        self.logger.info("history_read")
552
        request = ua.HistoryReadRequest()
553
        request.Parameters = params
554
        data = self._uasocket.send_request(request)
555
        response = struct_from_binary(ua.HistoryReadResponse, data)
556
        self.logger.debug(response)
557
        response.ResponseHeader.ServiceResult.check()
558
        return response.Results
559
560 1
    def modify_monitored_items(self, params):
561
        self.logger.info("modify_monitored_items")
562
        request = ua.ModifyMonitoredItemsRequest()
563
        request.Parameters = params
564
        data = self._uasocket.send_request(request)
565
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
566
        self.logger.debug(response)
567
        response.ResponseHeader.ServiceResult.check()
568
        return response.Results
569