Completed
Pull Request — master (#256)
by
unknown
04:00
created

UaClient.connect_socket()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 6
ccs 4
cts 4
cp 1
crap 1
rs 9.4285
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
from opcua.common.uaerrors import UaError
14
from opcua.common.uaerrors import BadNoSubscription, BadSessionClosed
15 1
16
17
class UASocketClient(object):
18
    """
19
    handle socket connection and send ua messages
20 1
    timeout is the timeout used while waiting for an ua answer from server
21 1
    """
22 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
23 1
        self.logger = logging.getLogger(__name__ + ".Socket")
24 1
        self._thread = None
25 1
        self._lock = Lock()
26 1
        self.timeout = timeout
27 1
        self._socket = None
28 1
        self._do_stop = False
29 1
        self.authentication_token = ua.NodeId()
30 1
        self._request_id = 0
31 1
        self._request_handle = 0
32
        self._callbackmap = {}
33 1
        self._connection = ua.SecureConnection(security_policy)
34
35
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39 1
        should not be necessary to call directly
40 1
        """
41
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48 1
        returns future
49 1
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            self.logger.debug("Sending: %s", request)
53
            try:
54
                binreq = request.to_binary()
55
            except:
56
                # reset reqeust handle if any error
57
                # see self._create_request_header
58 1
                self._request_handle -= 1
59 1
                raise
60 1
            self._request_id += 1
61 1
            future = Future()
62 1
            if callback:
63 1
                future.add_done_callback(callback)
64 1
            self._callbackmap[self._request_id] = future
65 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
66
            self._socket.write(msg)
67 1
        return future
68
69
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
70
        """
71
        send request to server.
72
        timeout is the timeout written in ua header
73 1
        returns response object if no callback is provided
74 1
        """
75 1
        future = self._send_request(request, callback, timeout, message_type)
76 1
        if not callback:
77 1
            data = future.result(self.timeout)
78
            self.check_answer(data, " in response to " + request.__class__.__name__)
79 1
            return data
80 1
81 1
    def check_answer(self, data, context):
82 1
        data = data.copy()
83 1
        typeid = ua.NodeId.from_binary(data)
84 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
85 1
            self.logger.warning("ServiceFault from server received: %s", context)
86
            hdr = ua.ResponseHeader.from_binary(data)
87 1
            hdr.ServiceResult.check()
88
            return False
89 1
        return True
90 1
91 1
    def _run(self):
92 1
        self.logger.info("Thread started")
93 1
        while not self._do_stop:
94 1
            try:
95 1
                self._receive()
96 1
            except ua.utils.SocketClosedException:
97 1
                self.logger.info("Socket has closed connection")
98
                break
99 1
            except UaError:
100 1
                self.logger.exception("Protocol Error")
101 1
        self.logger.info("Thread ended")
102
103 1
    def _receive(self):
104 1
        msg = self._connection.receive_from_socket(self._socket)
105 1
        if msg is None:
106 1
            return
107
        elif isinstance(msg, ua.Message):
108
            self._call_callback(msg.request_id(), msg.body())
109
        elif isinstance(msg, ua.Acknowledge):
110
            self._call_callback(0, msg)
111
        elif isinstance(msg, ua.ErrorMessage):
112 1
            self.logger.warning("Received an error: %s", msg)
113 1
        else:
114 1
            raise ua.UaError("Unsupported message type: %s", msg)
115 1
116
    def _call_callback(self, request_id, body):
117 1
        with self._lock:
118
            future = self._callbackmap.pop(request_id, None)
119 1
            if future is None:
120 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
121 1
        future.set_result(body)
122 1
123 1
    def _create_request_header(self, timeout=1000):
124 1
        hdr = ua.RequestHeader()
125 1
        hdr.AuthenticationToken = self.authentication_token
126
        self._request_handle += 1
127 1
        hdr.RequestHandle = self._request_handle
128
        hdr.TimeoutHint = timeout
129
        return hdr
130
131 1
    def connect_socket(self, host, port):
132 1
        """
133 1
        connect to server socket and start receiving thread
134 1
        """
135 1
        self.logger.info("opening connection")
136
        sock = socket.create_connection((host, port))
137 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
138 1
        self._socket = utils.SocketWrapper(sock)
139 1
        self.start()
140 1
141 1
    def disconnect_socket(self):
142
        self.logger.info("stop request")
143 1
        self._do_stop = True
144 1
        self._socket.socket.shutdown(socket.SHUT_WR)
145 1
        self._socket.socket.close()
146 1
147 1
    def send_hello(self, url):
148 1
        hello = ua.Hello()
149 1
        hello.EndpointUrl = url
150 1
        future = Future()
151 1
        with self._lock:
152 1
            self._callbackmap[0] = future
153
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
154 1
        self._socket.write(binmsg)
155 1
        ack = future.result(self.timeout)
156 1
        return ack
157 1
158 1
    def open_secure_channel(self, params):
159
        self.logger.info("open_secure_channel")
160 1
        request = ua.OpenSecureChannelRequest()
161 1
        request.Parameters = params
162 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
163 1
164
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
165 1
        response.ResponseHeader.ServiceResult.check()
166
        self._connection.set_channel(response.Parameters)
167
        return response.Parameters
168
169
    def close_secure_channel(self):
170
        """
171 1
        close secure channel. It seems to trigger a shutdown of socket
172 1
        in most servers, so be prepare to reconnect.
173 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
174 1
        """
175
        self.logger.info("close_secure_channel")
176 1
        request = ua.CloseSecureChannelRequest()
177 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
178
        with self._lock:
179
            # don't expect any more answers
180
            future.cancel()
181
            self._callbackmap.clear()
182 1
183
        # some servers send a response here, most do not ... so we ignore
184
185
186
class UaClient(object):
187
188
    """
189
    low level OPC-UA client.
190
191
    It implements (almost) all methods defined in opcua spec
192
    taking in argument the structures defined in opcua spec.
193
194 1
    In this Python implementation  most of the structures are defined in
195 1
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
196
    """
197 1
198 1
    def __init__(self, timeout=1):
199 1
        self.logger = logging.getLogger(__name__)
200 1
        # _publishcallbacks should be accessed in recv thread only
201
        self._publishcallbacks = {}
202 1
        self._timeout = timeout
203 1
        self._uasocket = None
204
        self._security_policy = ua.SecurityPolicy()
205 1
206
    def set_security(self, policy):
207
        self._security_policy = policy
208
209 1
    def connect_socket(self, host, port):
210 1
        """
211
        connect to server socket and start receiving thread
212 1
        """
213 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
214
        return self._uasocket.connect_socket(host, port)
215 1
216 1
    def disconnect_socket(self):
217
        return self._uasocket.disconnect_socket()
218 1
219 1
    def send_hello(self, url):
220
        return self._uasocket.send_hello(url)
221 1
222
    def open_secure_channel(self, params):
223
        return self._uasocket.open_secure_channel(params)
224
225
    def close_secure_channel(self):
226 1
        """
227
        close secure channel. It seems to trigger a shutdown of socket
228 1
        in most servers, so be prepare to reconnect
229 1
        """
230 1
        return self._uasocket.close_secure_channel()
231 1
232 1
    def create_session(self, parameters):
233 1
        self.logger.info("create_session")
234 1
        request = ua.CreateSessionRequest()
235 1
        request.Parameters = parameters
236 1
        data = self._uasocket.send_request(request)
237 1
        response = ua.CreateSessionResponse.from_binary(data)
238
        self.logger.debug(response)
239 1
        response.ResponseHeader.ServiceResult.check()
240 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
241 1
        return response.Parameters
242 1
243 1
    def activate_session(self, parameters):
244 1
        self.logger.info("activate_session")
245 1
        request = ua.ActivateSessionRequest()
246 1
        request.Parameters = parameters
247 1
        data = self._uasocket.send_request(request)
248
        response = ua.ActivateSessionResponse.from_binary(data)
249 1
        self.logger.debug(response)
250 1
        response.ResponseHeader.ServiceResult.check()
251 1
        return response.Parameters
252 1
253 1
    def close_session(self, deletesubscriptions):
254 1
        self.logger.info("close_session")
255
        request = ua.CloseSessionRequest()
256
        request.DeleteSubscriptions = deletesubscriptions
257 1
        data = self._uasocket.send_request(request)
258 1
        response = ua.CloseSessionResponse.from_binary(data)
259 1
        try:
260 1
            response.ResponseHeader.ServiceResult.check()
261 1
        except BadSessionClosed:
262 1
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
263 1
            #          we can just ignore it therefore.
264 1
            #          Alternatively we could make sure that there are no publish requests in flight when
265 1
            #          closing the session.
266
            pass
267 1
268 1
    def browse(self, parameters):
269 1
        self.logger.info("browse")
270 1
        request = ua.BrowseRequest()
271 1
        request.Parameters = parameters
272 1
        data = self._uasocket.send_request(request)
273 1
        response = ua.BrowseResponse.from_binary(data)
274 1
        self.logger.debug(response)
275
        response.ResponseHeader.ServiceResult.check()
276 1
        return response.Results
277 1
278
    def read(self, parameters):
279
        self.logger.info("read")
280
        request = ua.ReadRequest()
281 1
        request.Parameters = parameters
282 1
        data = self._uasocket.send_request(request)
283 1
        response = ua.ReadResponse.from_binary(data)
284 1
        self.logger.debug(response)
285 1
        response.ResponseHeader.ServiceResult.check()
286
        # cast to Enum attributes that need to
287 1
        for idx, rv in enumerate(parameters.NodesToRead):
288 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
289 1
                dv = response.Results[idx]
290 1
                if dv.StatusCode.is_good():
291 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
292 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
293 1
                dv = response.Results[idx]
294 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
295 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
296
        return response.Results
297 1
298 1
    def write(self, params):
299 1
        self.logger.info("read")
300 1
        request = ua.WriteRequest()
301 1
        request.Parameters = params
302 1
        data = self._uasocket.send_request(request)
303 1
        response = ua.WriteResponse.from_binary(data)
304 1
        self.logger.debug(response)
305 1
        response.ResponseHeader.ServiceResult.check()
306
        return response.Results
307 1
308 1
    def get_endpoints(self, params):
309 1
        self.logger.info("get_endpoint")
310 1
        request = ua.GetEndpointsRequest()
311 1
        request.Parameters = params
312 1
        data = self._uasocket.send_request(request)
313 1
        response = ua.GetEndpointsResponse.from_binary(data)
314 1
        self.logger.debug(response)
315 1
        response.ResponseHeader.ServiceResult.check()
316
        return response.Endpoints
317 1
318
    def find_servers(self, params):
319
        self.logger.info("find_servers")
320
        request = ua.FindServersRequest()
321
        request.Parameters = params
322
        data = self._uasocket.send_request(request)
323
        response = ua.FindServersResponse.from_binary(data)
324
        self.logger.debug(response)
325
        response.ResponseHeader.ServiceResult.check()
326
        return response.Servers
327 1
328 1
    def find_servers_on_network(self, params):
329 1
        self.logger.info("find_servers_on_network")
330 1
        request = ua.FindServersOnNetworkRequest()
331 1
        request.Parameters = params
332 1
        data = self._uasocket.send_request(request)
333 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
334 1
        self.logger.debug(response)
335
        response.ResponseHeader.ServiceResult.check()
336
        return response.Parameters
337 1
338
    def register_server(self, registered_server):
339
        self.logger.info("register_server")
340
        request = ua.RegisterServerRequest()
341
        request.Server = registered_server
342
        data = self._uasocket.send_request(request)
343
        response = ua.RegisterServerResponse.from_binary(data)
344
        self.logger.debug(response)
345
        response.ResponseHeader.ServiceResult.check()
346
        # nothing to return for this service
347 1
348 1
    def register_server2(self, params):
349 1
        self.logger.info("register_server2")
350 1
        request = ua.RegisterServer2Request()
351 1
        request.Parameters = params
352 1
        data = self._uasocket.send_request(request)
353 1
        response = ua.RegisterServer2Response.from_binary(data)
354 1
        self.logger.debug(response)
355 1
        response.ResponseHeader.ServiceResult.check()
356
        return response.ConfigurationResults
357 1
358 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
359 1
        self.logger.info("translate_browsepath_to_nodeid")
360 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
361 1
        request.Parameters.BrowsePaths = browsepaths
362 1
        data = self._uasocket.send_request(request)
363 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
364 1
        self.logger.debug(response)
365
        response.ResponseHeader.ServiceResult.check()
366 1
        return response.Results
367 1
368 1
    def create_subscription(self, params, callback):
369 1
        self.logger.info("create_subscription")
370 1
        request = ua.CreateSubscriptionRequest()
371 1
        request.Parameters = params
372 1
        resp_fut = Future()
373 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
374
        self._uasocket.send_request(request, mycallbak)
375 1
        return resp_fut.result(self._timeout)
376 1
377 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
378 1
        self.logger.info("_create_subscription_callback")
379 1
        data = data_fut.result()
380 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
381 1
        self.logger.debug(response)
382 1
        response.ResponseHeader.ServiceResult.check()
383
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
384 1
        resp_fut.set_result(response.Parameters)
385 1
386 1
    def delete_subscriptions(self, subscriptionids):
387 1
        self.logger.info("delete_subscription")
388 1
        request = ua.DeleteSubscriptionsRequest()
389 1
        request.Parameters.SubscriptionIds = subscriptionids
390 1
        resp_fut = Future()
391 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
392 1
        self._uasocket.send_request(request, mycallbak)
393
        return resp_fut.result(self._timeout)
394 1
395 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
396 1
        self.logger.info("_delete_subscriptions_callback")
397 1
        data = data_fut.result()
398 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
399 1
        self.logger.debug(response)
400 1
        response.ResponseHeader.ServiceResult.check()
401
        for sid in subscriptionids:
402 1
            self._publishcallbacks.pop(sid)
403 1
        resp_fut.set_result(response.Results)
404 1
405 1
    def publish(self, acks=None):
406 1
        self.logger.info("publish")
407 1
        if acks is None:
408 1
            acks = []
409
        request = ua.PublishRequest()
410
        request.Parameters.SubscriptionAcknowledgements = acks
411
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
412
413 1
    def _call_publish_callback(self, future):
414
        self.logger.info("call_publish_callback")
415
        data = future.result()
416 1
417 1
        # check if answer looks ok
418 1
        try:
419
            self._uasocket.check_answer(data, "during waiting for publish response")
420
        except BadNoSubscription:
421
            # NOTE: check_answer() logs the answer even though we catch the exception
422 1
423 1
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
424 1
            # Spec Part 5, Paragraph 13.8.1
425 1
            # BadNoSubscription is expected after deleting the last subscription.
426 1
            #
427 1
            # We should therefore also check for len(self._publishcallbacks) == 0, but
428 1
            # this gets us into trouble if a Publish response arrives before the
429 1
            # DeleteSubscription response.
430 1
            #
431
            # We could remove the callback already when sending the DeleteSubscription request,
432 1
            # but there are some legitimate reasons to keep them around, such as when the server
433 1
            # responds with "BadTimeout" and we should try again later instead of just removing
434 1
            # the subscription client-side.
435 1
            #
436 1
            # There are a variety of ways to act correctly, but the most practical solution seems
437 1
            # to be to just ignore any BadNoSubscription responses.
438 1
            return
439 1
440 1
        # parse publish response
441
        try:
442 1
            response = ua.PublishResponse.from_binary(data)
443 1
            self.logger.debug(response)
444 1
        except Exception:
445 1
            self.logger.exception("Error parsing notificatipn from server")
446 1
            self.publish([]) #send publish request ot server so he does stop sending notifications
447 1
            return
448 1
449 1
        # look for callback
450 1
        try:
451
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
452 1
        except KeyError:
453 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
454 1
            return
455 1
456 1
        # do callback
457 1
        try:
458 1
            callback(response.Parameters)
459 1
        except Exception:  # we call client code, catch everything!
460 1
            self.logger.exception("Exception while calling user callback: %s")
461
462 1
    def create_monitored_items(self, params):
463 1
        self.logger.info("create_monitored_items")
464 1
        request = ua.CreateMonitoredItemsRequest()
465 1
        request.Parameters = params
466 1
        data = self._uasocket.send_request(request)
467 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
468 1
        self.logger.debug(response)
469 1
        response.ResponseHeader.ServiceResult.check()
470
        return response.Results
471 1
472
    def delete_monitored_items(self, params):
473
        self.logger.info("delete_monitored_items")
474
        request = ua.DeleteMonitoredItemsRequest()
475
        request.Parameters = params
476
        data = self._uasocket.send_request(request)
477
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
478
        self.logger.debug(response)
479
        response.ResponseHeader.ServiceResult.check()
480
        return response.Results
481
482
    def add_nodes(self, nodestoadd):
483
        self.logger.info("add_nodes")
484
        request = ua.AddNodesRequest()
485
        request.Parameters.NodesToAdd = nodestoadd
486
        data = self._uasocket.send_request(request)
487
        response = ua.AddNodesResponse.from_binary(data)
488
        self.logger.debug(response)
489
        response.ResponseHeader.ServiceResult.check()
490
        return response.Results
491
492
    def delete_nodes(self, nodestodelete):
493
        self.logger.info("delete_nodes")
494
        request = ua.DeleteNodesRequest()
495
        request.Parameters.NodesToDelete = nodestodelete
496
        data = self._uasocket.send_request(request)
497
        response = ua.DeleteNodesResponse.from_binary(data)
498
        self.logger.debug(response)
499
        response.ResponseHeader.ServiceResult.check()
500
        return response.Results
501
502
    def call(self, methodstocall):
503
        request = ua.CallRequest()
504
        request.Parameters.MethodsToCall = methodstocall
505
        data = self._uasocket.send_request(request)
506
        response = ua.CallResponse.from_binary(data)
507
        self.logger.debug(response)
508
        response.ResponseHeader.ServiceResult.check()
509
        return response.Results
510
511
    def history_read(self, params):
512
        self.logger.info("history_read")
513
        request = ua.HistoryReadRequest()
514
        request.Parameters = params
515
        data = self._uasocket.send_request(request)
516
        response = ua.HistoryReadResponse.from_binary(data)
517
        self.logger.debug(response)
518
        response.ResponseHeader.ServiceResult.check()
519
        return response.Results
520
521
    def modify_monitored_items(self, params):
522
        self.logger.info("modify_monitored_items")
523
        request = ua.ModifyMonitoredItemsRequest()
524
        request.Parameters = params
525
        data = self._uasocket.send_request(request)
526
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
527
        self.logger.debug(response)
528
        response.ResponseHeader.ServiceResult.check()
529
        return response.Results
530