Completed
Pull Request — master (#509)
by
unknown
03:15
created

UASocketClient.connect_socket()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.5786

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 9
ccs 1
cts 6
cp 0.1666
crap 1.5786
rs 9.6666
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14 1
from opcua.common.connection import SecureConnection
15
16
17 1
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20
    timeout is the timeout used while waiting for an ua answer from server
21
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23
        self.logger = logging.getLogger(__name__ + ".Socket")
24
        self._thread = None
25
        self._lock = Lock()
26
        self.timeout = timeout
27
        self._socket = None
28
        self._do_stop = False
29
        self.authentication_token = ua.NodeId()
30
        self._request_id = 0
31
        self._request_handle = 0
32
        self._callbackmap = {}
33
        self._connection = SecureConnection(security_policy)
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41
        self._thread = Thread(target=self._run)
42
        self._thread.start()
43
44 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48
        returns future
49
        """
50
        with self._lock:
51
            request.RequestHeader = self._create_request_header(timeout)
52
            self.logger.debug("Sending: %s", request)
53
            try:
54
                binreq = struct_to_binary(request)
55
            except:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58
                self._request_handle -= 1
59
                raise
60
            self._request_id += 1
61
            future = Future()
62
            if callback:
63
                future.add_done_callback(callback)
64
            self._callbackmap[self._request_id] = future
65
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66
            self._socket.write(msg)
67
        return future
68
69 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73
        returns response object if no callback is provided
74
        """
75
        future = self._send_request(request, callback, timeout, message_type)
76
        if not callback:
77
            data = future.result(self.timeout)
78
            self.check_answer(data, " in response to " + request.__class__.__name__)
79
            return data
80
81 1
    def check_answer(self, data, context):
82
        data = data.copy()
83
        typeid = nodeid_from_binary(data)
84
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85
            self.logger.warning("ServiceFault from server received %s", context)
86
            hdr = struct_from_binary(ua.ResponseHeader, data)
87
            hdr.ServiceResult.check()
88
            return False
89
        return True
90
91 1
    def _run(self):
92
        self.logger.info("Thread started")
93
        while not self._do_stop:
94
            try:
95
                self._receive()
96
            except ua.utils.SocketClosedException:
97
                self.logger.info("Socket has closed connection")
98
                break
99
            except UaError:
100
                self.logger.exception("Protocol Error")
101
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104
        msg = self._connection.receive_from_socket(self._socket)
105
        if msg is None:
106
            return
107
        elif isinstance(msg, ua.Message):
108
            self._call_callback(msg.request_id(), msg.body())
109
        elif isinstance(msg, ua.Acknowledge):
110
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112
            self.logger.warning("Received an error: %s", msg)
113
        else:
114
            raise ua.UaError("Unsupported message type: %s", msg)
115
116 1
    def _call_callback(self, request_id, body):
117
        with self._lock:
118
            future = self._callbackmap.pop(request_id, None)
119
            if future is None:
120
                raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
121
        future.set_result(body)
122
123 1
    def _create_request_header(self, timeout=1000):
124
        hdr = ua.RequestHeader()
125
        hdr.AuthenticationToken = self.authentication_token
126
        self._request_handle += 1
127
        hdr.RequestHandle = self._request_handle
128
        hdr.TimeoutHint = timeout
129
        return hdr
130
131 1
    def connect_socket(self, host, port):
132
        """
133
        connect to server socket and start receiving thread
134
        """
135
        self.logger.info("opening connection")
136
        sock = socket.create_connection((host, port))
137
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
138
        self._socket = ua.utils.SocketWrapper(sock)
139
        self.start()
140
141 1
    def disconnect_socket(self):
142
        self.logger.info("stop request")
143
        self._do_stop = True
144
        self._socket.socket.shutdown(socket.SHUT_RDWR)
145
        self._socket.socket.close()
146
147 1
    def send_hello(self, url):
148
        hello = ua.Hello()
149
        hello.EndpointUrl = url
150
        future = Future()
151
        with self._lock:
152
            self._callbackmap[0] = future
153
        binmsg = uatcp_to_binary(ua.MessageType.Hello, hello)
154
        self._socket.write(binmsg)
155
        ack = future.result(self.timeout)
156
        return ack
157
158 1
    def open_secure_channel(self, params):
159
        self.logger.info("open_secure_channel")
160
        request = ua.OpenSecureChannelRequest()
161
        request.Parameters = params
162
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
163
        
164
        # FIXME: we have a race condition here
165
        # we can get a packet with the new token id before we reach to store it..
166
        response = struct_from_binary(ua.OpenSecureChannelResponse, future.result(self.timeout))
167
        response.ResponseHeader.ServiceResult.check()
168
        self._connection.set_channel(response.Parameters)
169
        return response.Parameters
170
171 1
    def close_secure_channel(self):
172
        """
173
        close secure channel. It seems to trigger a shutdown of socket
174
        in most servers, so be prepare to reconnect.
175
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
176
        """
177
        self.logger.info("close_secure_channel")
178
        request = ua.CloseSecureChannelRequest()
179
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
180
        with self._lock:
181
            # don't expect any more answers
182
            future.cancel()
183
            self._callbackmap.clear()
184
185
        # some servers send a response here, most do not ... so we ignore
186
187
188 1
class UaClient(object):
189
190
    """
191
    low level OPC-UA client.
192
193
    It implements (almost) all methods defined in opcua spec
194
    taking in argument the structures defined in opcua spec.
195
196
    In this Python implementation  most of the structures are defined in
197
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
198
    """
199
200 1
    def __init__(self, timeout=1):
201
        self.logger = logging.getLogger(__name__)
202
        # _publishcallbacks should be accessed in recv thread only
203
        self._publishcallbacks = {}
204
        self._timeout = timeout
205
        self._uasocket = None
206
        self.security_policy = ua.SecurityPolicy()
207
208 1
    def set_security(self, policy):
209
        self.security_policy = policy
210
211 1
    def connect_socket(self, host, port):
212
        """
213
        connect to server socket and start receiving thread
214
        """
215
        self._uasocket = UASocketClient(self._timeout, security_policy=self.security_policy)
216
        return self._uasocket.connect_socket(host, port)
217
218 1
    def disconnect_socket(self):
219
        return self._uasocket.disconnect_socket()
220
221 1
    def send_hello(self, url):
222
        return self._uasocket.send_hello(url)
223
224 1
    def open_secure_channel(self, params):
225
        return self._uasocket.open_secure_channel(params)
226
227 1
    def close_secure_channel(self):
228
        """
229
        close secure channel. It seems to trigger a shutdown of socket
230
        in most servers, so be prepare to reconnect
231
        """
232
        return self._uasocket.close_secure_channel()
233
234 1
    def create_session(self, parameters):
235
        self.logger.info("create_session")
236
        request = ua.CreateSessionRequest()
237
        request.Parameters = parameters
238
        data = self._uasocket.send_request(request)
239
        response = struct_from_binary(ua.CreateSessionResponse, data)
240
        self.logger.debug(response)
241
        response.ResponseHeader.ServiceResult.check()
242
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
243
        return response.Parameters
244
245 1
    def activate_session(self, parameters):
246
        self.logger.info("activate_session")
247
        request = ua.ActivateSessionRequest()
248
        request.Parameters = parameters
249
        data = self._uasocket.send_request(request)
250
        response = struct_from_binary(ua.ActivateSessionResponse, data)
251
        self.logger.debug(response)
252
        response.ResponseHeader.ServiceResult.check()
253
        return response.Parameters
254
255 1
    def close_session(self, deletesubscriptions):
256
        self.logger.info("close_session")
257
        request = ua.CloseSessionRequest()
258
        request.DeleteSubscriptions = deletesubscriptions
259
        data = self._uasocket.send_request(request)
260
        response = struct_from_binary(ua.CloseSessionResponse, data)
261
        try:
262
            response.ResponseHeader.ServiceResult.check()
263
        except BadSessionClosed:
264
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
265
            #          we can just ignore it therefore.
266
            #          Alternatively we could make sure that there are no publish requests in flight when
267
            #          closing the session.
268
            pass
269
270 1
    def browse(self, parameters):
271
        self.logger.info("browse")
272
        request = ua.BrowseRequest()
273
        request.Parameters = parameters
274
        data = self._uasocket.send_request(request)
275
        response = struct_from_binary(ua.BrowseResponse, data)
276
        self.logger.debug(response)
277
        response.ResponseHeader.ServiceResult.check()
278
        return response.Results
279
280 1
    def browse_next(self, parameters):
281
        self.logger.info("browse next")
282
        request = ua.BrowseNextRequest()
283
        request.Parameters = parameters
284
        data = self._uasocket.send_request(request)
285
        response = struct_from_binary(ua.BrowseNextResponse, data)
286
        self.logger.debug(response)
287
        response.ResponseHeader.ServiceResult.check()
288
        return response.Parameters.Results
289
290 1
    def read(self, parameters):
291
        self.logger.info("read")
292
        request = ua.ReadRequest()
293
        request.Parameters = parameters
294
        data = self._uasocket.send_request(request)
295
        response = struct_from_binary(ua.ReadResponse, data)
296
        self.logger.debug(response)
297
        response.ResponseHeader.ServiceResult.check()
298
        # cast to Enum attributes that need to
299
        for idx, rv in enumerate(parameters.NodesToRead):
300
            if rv.AttributeId == ua.AttributeIds.NodeClass:
301
                dv = response.Results[idx]
302
                if dv.StatusCode.is_good():
303
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
304
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
305
                dv = response.Results[idx]
306
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
307
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
308
        return response.Results
309
310 1
    def write(self, params):
311
        self.logger.info("read")
312
        request = ua.WriteRequest()
313
        request.Parameters = params
314
        data = self._uasocket.send_request(request)
315
        response = struct_from_binary(ua.WriteResponse, data)
316
        self.logger.debug(response)
317
        response.ResponseHeader.ServiceResult.check()
318
        return response.Results
319
320 1
    def get_endpoints(self, params):
321
        self.logger.info("get_endpoint")
322
        request = ua.GetEndpointsRequest()
323
        request.Parameters = params
324
        data = self._uasocket.send_request(request)
325
        response = struct_from_binary(ua.GetEndpointsResponse, data)
326
        self.logger.debug(response)
327
        response.ResponseHeader.ServiceResult.check()
328
        return response.Endpoints
329
330 1
    def find_servers(self, params):
331
        self.logger.info("find_servers")
332
        request = ua.FindServersRequest()
333
        request.Parameters = params
334
        data = self._uasocket.send_request(request)
335
        response = struct_from_binary(ua.FindServersResponse, data)
336
        self.logger.debug(response)
337
        response.ResponseHeader.ServiceResult.check()
338
        return response.Servers
339
340 1
    def find_servers_on_network(self, params):
341
        self.logger.info("find_servers_on_network")
342
        request = ua.FindServersOnNetworkRequest()
343
        request.Parameters = params
344
        data = self._uasocket.send_request(request)
345
        response = struct_from_binary(ua.FindServersOnNetworkResponse, data)
346
        self.logger.debug(response)
347
        response.ResponseHeader.ServiceResult.check()
348
        return response.Parameters
349
350 1
    def register_server(self, registered_server):
351
        self.logger.info("register_server")
352
        request = ua.RegisterServerRequest()
353
        request.Server = registered_server
354
        data = self._uasocket.send_request(request)
355
        response = struct_from_binary(ua.RegisterServerResponse, data)
356
        self.logger.debug(response)
357
        response.ResponseHeader.ServiceResult.check()
358
        # nothing to return for this service
359
360 1
    def register_server2(self, params):
361
        self.logger.info("register_server2")
362
        request = ua.RegisterServer2Request()
363
        request.Parameters = params
364
        data = self._uasocket.send_request(request)
365
        response = struct_from_binary(ua.RegisterServer2Response, data)
366
        self.logger.debug(response)
367
        response.ResponseHeader.ServiceResult.check()
368
        return response.ConfigurationResults
369
370 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
371
        self.logger.info("translate_browsepath_to_nodeid")
372
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
373
        request.Parameters.BrowsePaths = browsepaths
374
        data = self._uasocket.send_request(request)
375
        response = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsResponse, data)
376
        self.logger.debug(response)
377
        response.ResponseHeader.ServiceResult.check()
378
        return response.Results
379
380 1
    def create_subscription(self, params, callback):
381
        self.logger.info("create_subscription")
382
        request = ua.CreateSubscriptionRequest()
383
        request.Parameters = params
384
        resp_fut = Future()
385
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
386
        self._uasocket.send_request(request, mycallbak)
387
        return resp_fut.result(self._timeout)
388
389 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
390
        self.logger.info("_create_subscription_callback")
391
        data = data_fut.result()
392
        response = struct_from_binary(ua.CreateSubscriptionResponse, data)
393
        self.logger.debug(response)
394
        response.ResponseHeader.ServiceResult.check()
395
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
396
        resp_fut.set_result(response.Parameters)
397
398 1
    def delete_subscriptions(self, subscriptionids):
399
        self.logger.info("delete_subscription")
400
        request = ua.DeleteSubscriptionsRequest()
401
        request.Parameters.SubscriptionIds = subscriptionids
402
        resp_fut = Future()
403
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
404
        self._uasocket.send_request(request, mycallbak)
405
        return resp_fut.result(self._timeout)
406
407 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
408
        self.logger.info("_delete_subscriptions_callback")
409
        data = data_fut.result()
410
        response = struct_from_binary(ua.DeleteSubscriptionsResponse, data)
411
        self.logger.debug(response)
412
        response.ResponseHeader.ServiceResult.check()
413
        for sid in subscriptionids:
414
            self._publishcallbacks.pop(sid)
415
        resp_fut.set_result(response.Results)
416
417 1
    def publish(self, acks=None):
418
        self.logger.info("publish")
419
        if acks is None:
420
            acks = []
421
        request = ua.PublishRequest()
422
        request.Parameters.SubscriptionAcknowledgements = acks
423
        # timeout could be set to 0 (= no timeout) but some servers do not support it
424
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
425
426 1
    def _call_publish_callback(self, future):
427
        self.logger.info("call_publish_callback")
428
        data = future.result()
429
430
        # check if answer looks ok
431
        try:
432
            self._uasocket.check_answer(data, "while waiting for publish response")
433
        except BadTimeout: # Spec Part 4, 7.28
434
            self.publish()
435
            return
436
        except BadNoSubscription: # Spec Part 5, 13.8.1
437
            # BadNoSubscription is expected after deleting the last subscription.
438
            #
439
            # We should therefore also check for len(self._publishcallbacks) == 0, but
440
            # this gets us into trouble if a Publish response arrives before the
441
            # DeleteSubscription response.
442
            #
443
            # We could remove the callback already when sending the DeleteSubscription request,
444
            # but there are some legitimate reasons to keep them around, such as when the server
445
            # responds with "BadTimeout" and we should try again later instead of just removing
446
            # the subscription client-side.
447
            #
448
            # There are a variety of ways to act correctly, but the most practical solution seems
449
            # to be to just ignore any BadNoSubscription responses.
450
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
451
            return
452
453
        # parse publish response
454
        try:
455
            response = struct_from_binary(ua.PublishResponse, data)
456
            self.logger.debug(response)
457
        except Exception:
458
            # INFO: catching the exception here might be obsolete because we already
459
            #       catch BadTimeout above. However, it's not really clear what this code
460
            #       does so it stays in, doesn't seem to hurt.
461
            self.logger.exception("Error parsing notificatipn from server")
462
            self.publish([]) #send publish request ot server so he does stop sending notifications
463
            return
464
465
        # look for callback
466
        try:
467
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
468
        except KeyError:
469
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
470
            return
471
472
        # do callback
473
        try:
474
            callback(response.Parameters)
475
        except Exception:  # we call client code, catch everything!
476
            self.logger.exception("Exception while calling user callback: %s")
477
478 1
    def create_monitored_items(self, params):
479
        self.logger.info("create_monitored_items")
480
        request = ua.CreateMonitoredItemsRequest()
481
        request.Parameters = params
482
        data = self._uasocket.send_request(request)
483
        response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
484
        self.logger.debug(response)
485
        response.ResponseHeader.ServiceResult.check()
486
        return response.Results
487
488 1
    def delete_monitored_items(self, params):
489
        self.logger.info("delete_monitored_items")
490
        request = ua.DeleteMonitoredItemsRequest()
491
        request.Parameters = params
492
        data = self._uasocket.send_request(request)
493
        response = struct_from_binary(ua.DeleteMonitoredItemsResponse, data)
494
        self.logger.debug(response)
495
        response.ResponseHeader.ServiceResult.check()
496
        return response.Results
497
498 1
    def add_nodes(self, nodestoadd):
499
        self.logger.info("add_nodes")
500
        request = ua.AddNodesRequest()
501
        request.Parameters.NodesToAdd = nodestoadd
502
        data = self._uasocket.send_request(request)
503
        response = struct_from_binary(ua.AddNodesResponse, data)
504
        self.logger.debug(response)
505
        response.ResponseHeader.ServiceResult.check()
506
        return response.Results
507
508 1
    def add_references(self, refs):
509
        self.logger.info("add_references")
510
        request = ua.AddReferencesRequest()
511
        request.Parameters.ReferencesToAdd = refs
512
        data = self._uasocket.send_request(request)
513
        response = struct_from_binary(ua.AddReferencesResponse, data)
514
        self.logger.debug(response)
515
        response.ResponseHeader.ServiceResult.check()
516
        return response.Results
517
518 1
    def delete_references(self, refs):
519
        self.logger.info("delete")
520
        request = ua.DeleteReferencesRequest()
521
        request.Parameters.ReferencesToDelete = refs
522
        data = self._uasocket.send_request(request)
523
        response = struct_from_binary(ua.DeleteReferencesResponse, data)
524
        self.logger.debug(response)
525
        response.ResponseHeader.ServiceResult.check()
526
        return response.Parameters.Results
527
528
529 1
    def delete_nodes(self, params):
530
        self.logger.info("delete_nodes")
531
        request = ua.DeleteNodesRequest()
532
        request.Parameters = params
533
        data = self._uasocket.send_request(request)
534
        response = struct_from_binary(ua.DeleteNodesResponse, data)
535
        self.logger.debug(response)
536
        response.ResponseHeader.ServiceResult.check()
537
        return response.Results
538
539 1
    def call(self, methodstocall):
540
        request = ua.CallRequest()
541
        request.Parameters.MethodsToCall = methodstocall
542
        data = self._uasocket.send_request(request)
543
        response = struct_from_binary(ua.CallResponse, data)
544
        self.logger.debug(response)
545
        response.ResponseHeader.ServiceResult.check()
546
        return response.Results
547
548 1
    def history_read(self, params):
549
        self.logger.info("history_read")
550
        request = ua.HistoryReadRequest()
551
        request.Parameters = params
552
        data = self._uasocket.send_request(request)
553
        response = struct_from_binary(ua.HistoryReadResponse, data)
554
        self.logger.debug(response)
555
        response.ResponseHeader.ServiceResult.check()
556
        return response.Results
557
558 1
    def modify_monitored_items(self, params):
559
        self.logger.info("modify_monitored_items")
560
        request = ua.ModifyMonitoredItemsRequest()
561
        request.Parameters = params
562
        data = self._uasocket.send_request(request)
563
        response = struct_from_binary(ua.ModifyMonitoredItemsResponse, data)
564
        self.logger.debug(response)
565
        response.ResponseHeader.ServiceResult.check()
566
        return response.Results
567