Passed
Push — master ( 0b26fe...78dc7d )
by Olivier
04:21
created

UaClient.find_servers_on_network()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.7023

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 9
ccs 1
cts 9
cp 0.1111
crap 1.7023
rs 9.6666
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13 1
from opcua.ua.uaerrors import UaError, BadTimeout, BadNoSubscription, BadSessionClosed
14
15
16 1
class UASocketClient(object):
17
    """
18
    handle socket connection and send ua messages
19
    timeout is the timeout used while waiting for an ua answer from server
20
    """
21 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
22 1
        self.logger = logging.getLogger(__name__ + ".Socket")
23 1
        self._thread = None
24 1
        self._lock = Lock()
25 1
        self.timeout = timeout
26 1
        self._socket = None
27 1
        self._do_stop = False
28 1
        self.authentication_token = ua.NodeId()
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32 1
        self._connection = ua.SecureConnection(security_policy)
33
34 1
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39
        """
40 1
        self._thread = Thread(target=self._run)
41 1
        self._thread.start()
42
43 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            self.logger.debug("Sending: %s", request)
52 1
            try:
53 1
                binreq = request.to_binary()
54
            except:
55
                # reset reqeust handle if any error
56
                # see self._create_request_header
57
                self._request_handle -= 1
58
                raise
59 1
            self._request_id += 1
60 1
            future = Future()
61 1
            if callback:
62 1
                future.add_done_callback(callback)
63 1
            self._callbackmap[self._request_id] = future
64 1
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
65 1
            self._socket.write(msg)
66 1
        return future
67
68 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
69
        """
70
        send request to server.
71
        timeout is the timeout written in ua header
72
        returns response object if no callback is provided
73
        """
74 1
        future = self._send_request(request, callback, timeout, message_type)
75 1
        if not callback:
76 1
            data = future.result(self.timeout)
77 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
78 1
            return data
79
80 1
    def check_answer(self, data, context):
81 1
        data = data.copy()
82 1
        typeid = ua.NodeId.from_binary(data)
83 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
84 1
            self.logger.warning("ServiceFault from server received %s", context)
85 1
            hdr = ua.ResponseHeader.from_binary(data)
86 1
            hdr.ServiceResult.check()
87
            return False
88 1
        return True
89
90 1
    def _run(self):
91 1
        self.logger.info("Thread started")
92 1
        while not self._do_stop:
93 1
            try:
94 1
                self._receive()
95 1
            except ua.utils.SocketClosedException:
96 1
                self.logger.info("Socket has closed connection")
97 1
                break
98
            except UaError:
99
                self.logger.exception("Protocol Error")
100 1
        self.logger.info("Thread ended")
101
102 1
    def _receive(self):
103 1
        msg = self._connection.receive_from_socket(self._socket)
104 1
        if msg is None:
105
            return
106 1
        elif isinstance(msg, ua.Message):
107 1
            self._call_callback(msg.request_id(), msg.body())
108 1
        elif isinstance(msg, ua.Acknowledge):
109 1
            self._call_callback(0, msg)
110
        elif isinstance(msg, ua.ErrorMessage):
111
            self.logger.warning("Received an error: %s", msg)
112
        else:
113
            raise ua.UaError("Unsupported message type: %s", msg)
114
115 1
    def _call_callback(self, request_id, body):
116 1
        with self._lock:
117 1
            future = self._callbackmap.pop(request_id, None)
118 1
            if future is None:
119
                raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(request_id, self._callbackmap.keys()))
120 1
        future.set_result(body)
121
122 1
    def _create_request_header(self, timeout=1000):
123 1
        hdr = ua.RequestHeader()
124 1
        hdr.AuthenticationToken = self.authentication_token
125 1
        self._request_handle += 1
126 1
        hdr.RequestHandle = self._request_handle
127 1
        hdr.TimeoutHint = timeout
128 1
        return hdr
129
130 1
    def connect_socket(self, host, port):
131
        """
132
        connect to server socket and start receiving thread
133
        """
134 1
        self.logger.info("opening connection")
135 1
        sock = socket.create_connection((host, port))
136 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
137 1
        self._socket = utils.SocketWrapper(sock)
138 1
        self.start()
139
140 1
    def disconnect_socket(self):
141 1
        self.logger.info("stop request")
142 1
        self._do_stop = True
143 1
        self._socket.socket.shutdown(socket.SHUT_RDWR)
144 1
        self._socket.socket.close()
145
146 1
    def send_hello(self, url):
147 1
        hello = ua.Hello()
148 1
        hello.EndpointUrl = url
149 1
        future = Future()
150 1
        with self._lock:
151 1
            self._callbackmap[0] = future
152 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
153 1
        self._socket.write(binmsg)
154 1
        ack = future.result(self.timeout)
155 1
        return ack
156
157 1
    def open_secure_channel(self, params):
158 1
        self.logger.info("open_secure_channel")
159 1
        request = ua.OpenSecureChannelRequest()
160 1
        request.Parameters = params
161 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
162
        
163
        # FIXME: we have a race condition here
164
        # we can get a packet with the new token id before we reach to store it..
165 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
166 1
        response.ResponseHeader.ServiceResult.check()
167 1
        self._connection.set_channel(response.Parameters)
168 1
        return response.Parameters
169
170 1
    def close_secure_channel(self):
171
        """
172
        close secure channel. It seems to trigger a shutdown of socket
173
        in most servers, so be prepare to reconnect.
174
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
175
        """
176 1
        self.logger.info("close_secure_channel")
177 1
        request = ua.CloseSecureChannelRequest()
178 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
179 1
        with self._lock:
180
            # don't expect any more answers
181 1
            future.cancel()
182 1
            self._callbackmap.clear()
183
184
        # some servers send a response here, most do not ... so we ignore
185
186
187 1
class UaClient(object):
188
189
    """
190
    low level OPC-UA client.
191
192
    It implements (almost) all methods defined in opcua spec
193
    taking in argument the structures defined in opcua spec.
194
195
    In this Python implementation  most of the structures are defined in
196
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
197
    """
198
199 1
    def __init__(self, timeout=1):
200 1
        self.logger = logging.getLogger(__name__)
201
        # _publishcallbacks should be accessed in recv thread only
202 1
        self._publishcallbacks = {}
203 1
        self._timeout = timeout
204 1
        self._uasocket = None
205 1
        self._security_policy = ua.SecurityPolicy()
206
207 1
    def set_security(self, policy):
208 1
        self._security_policy = policy
209
210 1
    def connect_socket(self, host, port):
211
        """
212
        connect to server socket and start receiving thread
213
        """
214 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
215 1
        return self._uasocket.connect_socket(host, port)
216
217 1
    def disconnect_socket(self):
218 1
        return self._uasocket.disconnect_socket()
219
220 1
    def send_hello(self, url):
221 1
        return self._uasocket.send_hello(url)
222
223 1
    def open_secure_channel(self, params):
224 1
        return self._uasocket.open_secure_channel(params)
225
226 1
    def close_secure_channel(self):
227
        """
228
        close secure channel. It seems to trigger a shutdown of socket
229
        in most servers, so be prepare to reconnect
230
        """
231 1
        return self._uasocket.close_secure_channel()
232
233 1
    def create_session(self, parameters):
234 1
        self.logger.info("create_session")
235 1
        request = ua.CreateSessionRequest()
236 1
        request.Parameters = parameters
237 1
        data = self._uasocket.send_request(request)
238 1
        response = ua.CreateSessionResponse.from_binary(data)
239 1
        self.logger.debug(response)
240 1
        response.ResponseHeader.ServiceResult.check()
241 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242 1
        return response.Parameters
243
244 1
    def activate_session(self, parameters):
245 1
        self.logger.info("activate_session")
246 1
        request = ua.ActivateSessionRequest()
247 1
        request.Parameters = parameters
248 1
        data = self._uasocket.send_request(request)
249 1
        response = ua.ActivateSessionResponse.from_binary(data)
250 1
        self.logger.debug(response)
251 1
        response.ResponseHeader.ServiceResult.check()
252 1
        return response.Parameters
253
254 1
    def close_session(self, deletesubscriptions):
255 1
        self.logger.info("close_session")
256 1
        request = ua.CloseSessionRequest()
257 1
        request.DeleteSubscriptions = deletesubscriptions
258 1
        data = self._uasocket.send_request(request)
259 1
        response = ua.CloseSessionResponse.from_binary(data)
260 1
        try:
261 1
            response.ResponseHeader.ServiceResult.check()
262
        except BadSessionClosed:
263
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
264
            #          we can just ignore it therefore.
265
            #          Alternatively we could make sure that there are no publish requests in flight when
266
            #          closing the session.
267
            pass
268
269 1
    def browse(self, parameters):
270 1
        self.logger.info("browse")
271 1
        request = ua.BrowseRequest()
272 1
        request.Parameters = parameters
273 1
        data = self._uasocket.send_request(request)
274 1
        response = ua.BrowseResponse.from_binary(data)
275 1
        self.logger.debug(response)
276 1
        response.ResponseHeader.ServiceResult.check()
277 1
        return response.Results
278
279 1
    def browse_next(self, parameters):
280
        self.logger.info("browse next")
281
        request = ua.BrowseNextRequest()
282
        request.Parameters = parameters
283
        data = self._uasocket.send_request(request)
284
        response = ua.BrowseNextResponse.from_binary(data)
285
        self.logger.debug(response)
286
        response.ResponseHeader.ServiceResult.check()
287
        return response.Parameters.Results
288
289 1
    def read(self, parameters):
290 1
        self.logger.info("read")
291 1
        request = ua.ReadRequest()
292 1
        request.Parameters = parameters
293 1
        data = self._uasocket.send_request(request)
294 1
        response = ua.ReadResponse.from_binary(data)
295 1
        self.logger.debug(response)
296 1
        response.ResponseHeader.ServiceResult.check()
297
        # cast to Enum attributes that need to
298 1
        for idx, rv in enumerate(parameters.NodesToRead):
299 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
300 1
                dv = response.Results[idx]
301 1
                if dv.StatusCode.is_good():
302 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
303 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
304 1
                dv = response.Results[idx]
305 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
306 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
307 1
        return response.Results
308
309 1
    def write(self, params):
310 1
        self.logger.info("read")
311 1
        request = ua.WriteRequest()
312 1
        request.Parameters = params
313 1
        data = self._uasocket.send_request(request)
314 1
        response = ua.WriteResponse.from_binary(data)
315 1
        self.logger.debug(response)
316 1
        response.ResponseHeader.ServiceResult.check()
317 1
        return response.Results
318
319 1
    def get_endpoints(self, params):
320 1
        self.logger.info("get_endpoint")
321 1
        request = ua.GetEndpointsRequest()
322 1
        request.Parameters = params
323 1
        data = self._uasocket.send_request(request)
324 1
        response = ua.GetEndpointsResponse.from_binary(data)
325 1
        self.logger.debug(response)
326 1
        response.ResponseHeader.ServiceResult.check()
327 1
        return response.Endpoints
328
329 1
    def find_servers(self, params):
330 1
        self.logger.info("find_servers")
331 1
        request = ua.FindServersRequest()
332 1
        request.Parameters = params
333 1
        data = self._uasocket.send_request(request)
334 1
        response = ua.FindServersResponse.from_binary(data)
335 1
        self.logger.debug(response)
336 1
        response.ResponseHeader.ServiceResult.check()
337 1
        return response.Servers
338
339 1
    def find_servers_on_network(self, params):
340
        self.logger.info("find_servers_on_network")
341
        request = ua.FindServersOnNetworkRequest()
342
        request.Parameters = params
343
        data = self._uasocket.send_request(request)
344
        response = ua.FindServersOnNetworkResponse.from_binary(data)
345
        self.logger.debug(response)
346
        response.ResponseHeader.ServiceResult.check()
347
        return response.Parameters
348
349 1
    def register_server(self, registered_server):
350 1
        self.logger.info("register_server")
351 1
        request = ua.RegisterServerRequest()
352 1
        request.Server = registered_server
353 1
        data = self._uasocket.send_request(request)
354 1
        response = ua.RegisterServerResponse.from_binary(data)
355 1
        self.logger.debug(response)
356 1
        response.ResponseHeader.ServiceResult.check()
357
        # nothing to return for this service
358
359 1
    def register_server2(self, params):
360
        self.logger.info("register_server2")
361
        request = ua.RegisterServer2Request()
362
        request.Parameters = params
363
        data = self._uasocket.send_request(request)
364
        response = ua.RegisterServer2Response.from_binary(data)
365
        self.logger.debug(response)
366
        response.ResponseHeader.ServiceResult.check()
367
        return response.ConfigurationResults
368
369 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
370 1
        self.logger.info("translate_browsepath_to_nodeid")
371 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
372 1
        request.Parameters.BrowsePaths = browsepaths
373 1
        data = self._uasocket.send_request(request)
374 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
375 1
        self.logger.debug(response)
376 1
        response.ResponseHeader.ServiceResult.check()
377 1
        return response.Results
378
379 1
    def create_subscription(self, params, callback):
380 1
        self.logger.info("create_subscription")
381 1
        request = ua.CreateSubscriptionRequest()
382 1
        request.Parameters = params
383 1
        resp_fut = Future()
384 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
385 1
        self._uasocket.send_request(request, mycallbak)
386 1
        return resp_fut.result(self._timeout)
387
388 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
389 1
        self.logger.info("_create_subscription_callback")
390 1
        data = data_fut.result()
391 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
392 1
        self.logger.debug(response)
393 1
        response.ResponseHeader.ServiceResult.check()
394 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
395 1
        resp_fut.set_result(response.Parameters)
396
397 1
    def delete_subscriptions(self, subscriptionids):
398 1
        self.logger.info("delete_subscription")
399 1
        request = ua.DeleteSubscriptionsRequest()
400 1
        request.Parameters.SubscriptionIds = subscriptionids
401 1
        resp_fut = Future()
402 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
403 1
        self._uasocket.send_request(request, mycallbak)
404 1
        return resp_fut.result(self._timeout)
405
406 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
407 1
        self.logger.info("_delete_subscriptions_callback")
408 1
        data = data_fut.result()
409 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
410 1
        self.logger.debug(response)
411 1
        response.ResponseHeader.ServiceResult.check()
412 1
        for sid in subscriptionids:
413 1
            self._publishcallbacks.pop(sid)
414 1
        resp_fut.set_result(response.Results)
415
416 1
    def publish(self, acks=None):
417 1
        self.logger.info("publish")
418 1
        if acks is None:
419 1
            acks = []
420 1
        request = ua.PublishRequest()
421 1
        request.Parameters.SubscriptionAcknowledgements = acks
422
        # timeout could be set to 0 (= no timeout) but some servers do not support it
423 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8)) # 250 days
424
425 1
    def _call_publish_callback(self, future):
426 1
        self.logger.info("call_publish_callback")
427 1
        data = future.result()
428
429
        # check if answer looks ok
430 1
        try:
431 1
            self._uasocket.check_answer(data, "while waiting for publish response")
432
        except BadTimeout: # Spec Part 4, 7.28
433
            self.publish()
434
            return
435
        except BadNoSubscription: # Spec Part 5, 13.8.1
436
            # BadNoSubscription is expected after deleting the last subscription.
437
            #
438
            # We should therefore also check for len(self._publishcallbacks) == 0, but
439
            # this gets us into trouble if a Publish response arrives before the
440
            # DeleteSubscription response.
441
            #
442
            # We could remove the callback already when sending the DeleteSubscription request,
443
            # but there are some legitimate reasons to keep them around, such as when the server
444
            # responds with "BadTimeout" and we should try again later instead of just removing
445
            # the subscription client-side.
446
            #
447
            # There are a variety of ways to act correctly, but the most practical solution seems
448
            # to be to just ignore any BadNoSubscription responses.
449
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
450
            return
451
452
        # parse publish response
453 1
        try:
454 1
            response = ua.PublishResponse.from_binary(data)
455 1
            self.logger.debug(response)
456
        except Exception:
457
            # INFO: catching the exception here might be obsolete because we already
458
            #       catch BadTimeout above. However, it's not really clear what this code
459
            #       does so it stays in, doesn't seem to hurt.
460
            self.logger.exception("Error parsing notificatipn from server")
461
            self.publish([]) #send publish request ot server so he does stop sending notifications
462
            return
463
464
        # look for callback
465 1
        try:
466 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
467
        except KeyError:
468
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
469
            return
470
471
        # do callback
472 1
        try:
473 1
            callback(response.Parameters)
474
        except Exception:  # we call client code, catch everything!
475
            self.logger.exception("Exception while calling user callback: %s")
476
477 1
    def create_monitored_items(self, params):
478 1
        self.logger.info("create_monitored_items")
479 1
        request = ua.CreateMonitoredItemsRequest()
480 1
        request.Parameters = params
481 1
        data = self._uasocket.send_request(request)
482 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
483 1
        self.logger.debug(response)
484 1
        response.ResponseHeader.ServiceResult.check()
485 1
        return response.Results
486
487 1
    def delete_monitored_items(self, params):
488 1
        self.logger.info("delete_monitored_items")
489 1
        request = ua.DeleteMonitoredItemsRequest()
490 1
        request.Parameters = params
491 1
        data = self._uasocket.send_request(request)
492 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
493 1
        self.logger.debug(response)
494 1
        response.ResponseHeader.ServiceResult.check()
495 1
        return response.Results
496
497 1
    def add_nodes(self, nodestoadd):
498 1
        self.logger.info("add_nodes")
499 1
        request = ua.AddNodesRequest()
500 1
        request.Parameters.NodesToAdd = nodestoadd
501 1
        data = self._uasocket.send_request(request)
502 1
        response = ua.AddNodesResponse.from_binary(data)
503 1
        self.logger.debug(response)
504 1
        response.ResponseHeader.ServiceResult.check()
505 1
        return response.Results
506
507 1
    def add_references(self, refs):
508 1
        self.logger.info("add_references")
509 1
        request = ua.AddReferencesRequest()
510 1
        request.Parameters.ReferencesToAdd = refs
511 1
        data = self._uasocket.send_request(request)
512 1
        response = ua.AddReferencesResponse.from_binary(data)
513 1
        self.logger.debug(response)
514 1
        response.ResponseHeader.ServiceResult.check()
515 1
        return response.Results
516
517 1
    def delete_references(self, refs):
518
        self.logger.info("delete")
519
        request = ua.DeleteReferencesRequest()
520
        request.Parameters.ReferencesToDelete = refs
521
        data = self._uasocket.send_request(request)
522
        response = ua.DeleteReferencesResponse.from_binary(data)
523
        self.logger.debug(response)
524
        response.ResponseHeader.ServiceResult.check()
525
        return response.Parameters.Results
526
527
528 1
    def delete_nodes(self, params):
529 1
        self.logger.info("delete_nodes")
530 1
        request = ua.DeleteNodesRequest()
531 1
        request.Parameters = params
532 1
        data = self._uasocket.send_request(request)
533 1
        response = ua.DeleteNodesResponse.from_binary(data)
534 1
        self.logger.debug(response)
535 1
        response.ResponseHeader.ServiceResult.check()
536 1
        return response.Results
537
538 1
    def call(self, methodstocall):
539 1
        request = ua.CallRequest()
540 1
        request.Parameters.MethodsToCall = methodstocall
541 1
        data = self._uasocket.send_request(request)
542 1
        response = ua.CallResponse.from_binary(data)
543 1
        self.logger.debug(response)
544 1
        response.ResponseHeader.ServiceResult.check()
545 1
        return response.Results
546
547 1
    def history_read(self, params):
548
        self.logger.info("history_read")
549
        request = ua.HistoryReadRequest()
550
        request.Parameters = params
551
        data = self._uasocket.send_request(request)
552
        response = ua.HistoryReadResponse.from_binary(data)
553
        self.logger.debug(response)
554
        response.ResponseHeader.ServiceResult.check()
555
        return response.Results
556
557 1
    def modify_monitored_items(self, params):
558
        self.logger.info("modify_monitored_items")
559
        request = ua.ModifyMonitoredItemsRequest()
560
        request.Parameters = params
561
        data = self._uasocket.send_request(request)
562
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
563
        self.logger.debug(response)
564
        response.ResponseHeader.ServiceResult.check()
565
        return response.Results
566