Completed
Pull Request — master (#256)
by
unknown
04:27
created

UaClient.write()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 9
ccs 8
cts 8
cp 1
crap 1
rs 9.6666
c 0
b 0
f 0
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
from opcua.common.uaerrors import UaError
14
from opcua.common.uaerrors import UaStatusCodeError
15 1
from opcua.common.uaerrors import BadNoSubscription, BadSessionClosed
16
17
18
class UASocketClient(object):
19
    """
20 1
    handle socket connection and send ua messages
21 1
    timeout is the timeout used while waiting for an ua answer from server
22 1
    """
23 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
24 1
        self.logger = logging.getLogger(__name__ + ".Socket")
25 1
        self._thread = None
26 1
        self._lock = Lock()
27 1
        self.timeout = timeout
28 1
        self._socket = None
29 1
        self._do_stop = False
30 1
        self.authentication_token = ua.NodeId()
31 1
        self._request_id = 0
32
        self._request_handle = 0
33 1
        self._callbackmap = {}
34
        self._connection = ua.SecureConnection(security_policy)
35
36
    def start(self):
37
        """
38
        Start receiving thread.
39 1
        this is called automatically in connect and
40 1
        should not be necessary to call directly
41
        """
42 1
        self._thread = Thread(target=self._run)
43
        self._thread.start()
44
45
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
46
        """
47
        send request to server, lower-level method
48 1
        timeout is the timeout written in ua header
49 1
        returns future
50 1
        """
51 1
        with self._lock:
52 1
            request.RequestHeader = self._create_request_header(timeout)
53
            self.logger.debug("Sending: %s", request)
54
            try:
55
                binreq = request.to_binary()
56
            except:
57
                # reset reqeust handle if any error
58 1
                # see self._create_request_header
59 1
                self._request_handle -= 1
60 1
                raise
61 1
            self._request_id += 1
62 1
            future = Future()
63 1
            if callback:
64 1
                future.add_done_callback(callback)
65 1
            self._callbackmap[self._request_id] = future
66
            msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
67 1
            self._socket.write(msg)
68
        return future
69
70
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
71
        """
72
        send request to server.
73 1
        timeout is the timeout written in ua header
74 1
        returns response object if no callback is provided
75 1
        """
76 1
        future = self._send_request(request, callback, timeout, message_type)
77 1
        if not callback:
78
            data = future.result(self.timeout)
79 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
80 1
            return data
81 1
82 1
    def check_answer(self, data, context):
83 1
        data = data.copy()
84 1
        typeid = ua.NodeId.from_binary(data)
85 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
86
            self.logger.warning("ServiceFault from server received: %s", context)
87 1
            hdr = ua.ResponseHeader.from_binary(data)
88
            hdr.ServiceResult.check()
89 1
            return False
90 1
        return True
91 1
92 1
    def _run(self):
93 1
        self.logger.info("Thread started")
94 1
        while not self._do_stop:
95 1
            try:
96 1
                self._receive()
97 1
            except ua.utils.SocketClosedException:
98
                self.logger.info("Socket has closed connection")
99 1
                break
100 1
            except UaError:
101 1
                self.logger.exception("Protocol Error")
102
        self.logger.info("Thread ended")
103 1
104 1
    def _receive(self):
105 1
        msg = self._connection.receive_from_socket(self._socket)
106 1
        if msg is None:
107
            return
108
        elif isinstance(msg, ua.Message):
109
            self._call_callback(msg.request_id(), msg.body())
110
        elif isinstance(msg, ua.Acknowledge):
111
            self._call_callback(0, msg)
112 1
        elif isinstance(msg, ua.ErrorMessage):
113 1
            self.logger.warning("Received an error: %s", msg)
114 1
        else:
115 1
            raise ua.UaError("Unsupported message type: %s", msg)
116
117 1
    def _call_callback(self, request_id, body):
118
        with self._lock:
119 1
            future = self._callbackmap.pop(request_id, None)
120 1
            if future is None:
121 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
122 1
        future.set_result(body)
123 1
124 1
    def _create_request_header(self, timeout=1000):
125 1
        hdr = ua.RequestHeader()
126
        hdr.AuthenticationToken = self.authentication_token
127 1
        self._request_handle += 1
128
        hdr.RequestHandle = self._request_handle
129
        hdr.TimeoutHint = timeout
130
        return hdr
131 1
132 1
    def connect_socket(self, host, port):
133 1
        """
134 1
        connect to server socket and start receiving thread
135 1
        """
136
        self.logger.info("opening connection")
137 1
        sock = socket.create_connection((host, port))
138 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
139 1
        self._socket = utils.SocketWrapper(sock)
140 1
        self.start()
141 1
142
    def disconnect_socket(self):
143 1
        self.logger.info("stop request")
144 1
        self._do_stop = True
145 1
        self._socket.socket.shutdown(socket.SHUT_WR)
146 1
        self._socket.socket.close()
147 1
148 1
    def send_hello(self, url):
149 1
        hello = ua.Hello()
150 1
        hello.EndpointUrl = url
151 1
        future = Future()
152 1
        with self._lock:
153
            self._callbackmap[0] = future
154 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
155 1
        self._socket.write(binmsg)
156 1
        ack = future.result(self.timeout)
157 1
        return ack
158 1
159
    def open_secure_channel(self, params):
160 1
        self.logger.info("open_secure_channel")
161 1
        request = ua.OpenSecureChannelRequest()
162 1
        request.Parameters = params
163 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
164
165 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
166
        response.ResponseHeader.ServiceResult.check()
167
        self._connection.set_channel(response.Parameters)
168
        return response.Parameters
169
170
    def close_secure_channel(self):
171 1
        """
172 1
        close secure channel. It seems to trigger a shutdown of socket
173 1
        in most servers, so be prepare to reconnect.
174 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
175
        """
176 1
        self.logger.info("close_secure_channel")
177 1
        request = ua.CloseSecureChannelRequest()
178
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
179
        with self._lock:
180
            # don't expect any more answers
181
            future.cancel()
182 1
            self._callbackmap.clear()
183
184
        # some servers send a response here, most do not ... so we ignore
185
186
187
class UaClient(object):
188
189
    """
190
    low level OPC-UA client.
191
192
    It implements (almost) all methods defined in opcua spec
193
    taking in argument the structures defined in opcua spec.
194 1
195 1
    In this Python implementation  most of the structures are defined in
196
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
197 1
    """
198 1
199 1
    def __init__(self, timeout=1):
200 1
        self.logger = logging.getLogger(__name__)
201
        # _publishcallbacks should be accessed in recv thread only
202 1
        self._publishcallbacks = {}
203 1
        self._timeout = timeout
204
        self._uasocket = None
205 1
        self._security_policy = ua.SecurityPolicy()
206
207
    def set_security(self, policy):
208
        self._security_policy = policy
209 1
210 1
    def connect_socket(self, host, port):
211
        """
212 1
        connect to server socket and start receiving thread
213 1
        """
214
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
215 1
        return self._uasocket.connect_socket(host, port)
216 1
217
    def disconnect_socket(self):
218 1
        return self._uasocket.disconnect_socket()
219 1
220
    def send_hello(self, url):
221 1
        return self._uasocket.send_hello(url)
222
223
    def open_secure_channel(self, params):
224
        return self._uasocket.open_secure_channel(params)
225
226 1
    def close_secure_channel(self):
227
        """
228 1
        close secure channel. It seems to trigger a shutdown of socket
229 1
        in most servers, so be prepare to reconnect
230 1
        """
231 1
        return self._uasocket.close_secure_channel()
232 1
233 1
    def create_session(self, parameters):
234 1
        self.logger.info("create_session")
235 1
        request = ua.CreateSessionRequest()
236 1
        request.Parameters = parameters
237 1
        data = self._uasocket.send_request(request)
238
        response = ua.CreateSessionResponse.from_binary(data)
239 1
        self.logger.debug(response)
240 1
        response.ResponseHeader.ServiceResult.check()
241 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
242 1
        return response.Parameters
243 1
244 1
    def activate_session(self, parameters):
245 1
        self.logger.info("activate_session")
246 1
        request = ua.ActivateSessionRequest()
247 1
        request.Parameters = parameters
248
        data = self._uasocket.send_request(request)
249 1
        response = ua.ActivateSessionResponse.from_binary(data)
250 1
        self.logger.debug(response)
251 1
        response.ResponseHeader.ServiceResult.check()
252 1
        return response.Parameters
253 1
254 1
    def close_session(self, deletesubscriptions):
255
        self.logger.info("close_session")
256
        request = ua.CloseSessionRequest()
257 1
        request.DeleteSubscriptions = deletesubscriptions
258 1
        data = self._uasocket.send_request(request)
259 1
        response = ua.CloseSessionResponse.from_binary(data)
260 1
        try:
261 1
            response.ResponseHeader.ServiceResult.check()
262 1
        except BadSessionClosed:
263 1
            # Problem: closing the session with open publish requests leads to BadSessionClosed responses
264 1
            #          we can just ignore it therefore.
265 1
            #          Alternatively we could make sure that there are no publish requests in flight when
266
            #          closing the session.
267 1
            pass
268 1
269 1
    def browse(self, parameters):
270 1
        self.logger.info("browse")
271 1
        request = ua.BrowseRequest()
272 1
        request.Parameters = parameters
273 1
        data = self._uasocket.send_request(request)
274 1
        response = ua.BrowseResponse.from_binary(data)
275
        self.logger.debug(response)
276 1
        response.ResponseHeader.ServiceResult.check()
277 1
        return response.Results
278
279
    def read(self, parameters):
280
        self.logger.info("read")
281 1
        request = ua.ReadRequest()
282 1
        request.Parameters = parameters
283 1
        data = self._uasocket.send_request(request)
284 1
        response = ua.ReadResponse.from_binary(data)
285 1
        self.logger.debug(response)
286
        response.ResponseHeader.ServiceResult.check()
287 1
        # cast to Enum attributes that need to
288 1
        for idx, rv in enumerate(parameters.NodesToRead):
289 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
290 1
                dv = response.Results[idx]
291 1
                if dv.StatusCode.is_good():
292 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
293 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
294 1
                dv = response.Results[idx]
295 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
296
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
297 1
        return response.Results
298 1
299 1
    def write(self, params):
300 1
        self.logger.info("read")
301 1
        request = ua.WriteRequest()
302 1
        request.Parameters = params
303 1
        data = self._uasocket.send_request(request)
304 1
        response = ua.WriteResponse.from_binary(data)
305 1
        self.logger.debug(response)
306
        response.ResponseHeader.ServiceResult.check()
307 1
        return response.Results
308 1
309 1
    def get_endpoints(self, params):
310 1
        self.logger.info("get_endpoint")
311 1
        request = ua.GetEndpointsRequest()
312 1
        request.Parameters = params
313 1
        data = self._uasocket.send_request(request)
314 1
        response = ua.GetEndpointsResponse.from_binary(data)
315 1
        self.logger.debug(response)
316
        response.ResponseHeader.ServiceResult.check()
317 1
        return response.Endpoints
318
319
    def find_servers(self, params):
320
        self.logger.info("find_servers")
321
        request = ua.FindServersRequest()
322
        request.Parameters = params
323
        data = self._uasocket.send_request(request)
324
        response = ua.FindServersResponse.from_binary(data)
325
        self.logger.debug(response)
326
        response.ResponseHeader.ServiceResult.check()
327 1
        return response.Servers
328 1
329 1
    def find_servers_on_network(self, params):
330 1
        self.logger.info("find_servers_on_network")
331 1
        request = ua.FindServersOnNetworkRequest()
332 1
        request.Parameters = params
333 1
        data = self._uasocket.send_request(request)
334 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
335
        self.logger.debug(response)
336
        response.ResponseHeader.ServiceResult.check()
337 1
        return response.Parameters
338
339
    def register_server(self, registered_server):
340
        self.logger.info("register_server")
341
        request = ua.RegisterServerRequest()
342
        request.Server = registered_server
343
        data = self._uasocket.send_request(request)
344
        response = ua.RegisterServerResponse.from_binary(data)
345
        self.logger.debug(response)
346
        response.ResponseHeader.ServiceResult.check()
347 1
        # nothing to return for this service
348 1
349 1
    def register_server2(self, params):
350 1
        self.logger.info("register_server2")
351 1
        request = ua.RegisterServer2Request()
352 1
        request.Parameters = params
353 1
        data = self._uasocket.send_request(request)
354 1
        response = ua.RegisterServer2Response.from_binary(data)
355 1
        self.logger.debug(response)
356
        response.ResponseHeader.ServiceResult.check()
357 1
        return response.ConfigurationResults
358 1
359 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
360 1
        self.logger.info("translate_browsepath_to_nodeid")
361 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
362 1
        request.Parameters.BrowsePaths = browsepaths
363 1
        data = self._uasocket.send_request(request)
364 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
365
        self.logger.debug(response)
366 1
        response.ResponseHeader.ServiceResult.check()
367 1
        return response.Results
368 1
369 1
    def create_subscription(self, params, callback):
370 1
        self.logger.info("create_subscription")
371 1
        request = ua.CreateSubscriptionRequest()
372 1
        request.Parameters = params
373 1
        resp_fut = Future()
374
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
375 1
        self._uasocket.send_request(request, mycallbak)
376 1
        return resp_fut.result(self._timeout)
377 1
378 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
379 1
        self.logger.info("_create_subscription_callback")
380 1
        data = data_fut.result()
381 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
382 1
        self.logger.debug(response)
383
        response.ResponseHeader.ServiceResult.check()
384 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
385 1
        resp_fut.set_result(response.Parameters)
386 1
387 1
    def delete_subscriptions(self, subscriptionids):
388 1
        self.logger.info("delete_subscription")
389 1
        request = ua.DeleteSubscriptionsRequest()
390 1
        request.Parameters.SubscriptionIds = subscriptionids
391 1
        resp_fut = Future()
392 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
393
        self._uasocket.send_request(request, mycallbak)
394 1
        return resp_fut.result(self._timeout)
395 1
396 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
397 1
        self.logger.info("_delete_subscriptions_callback")
398 1
        data = data_fut.result()
399 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
400 1
        self.logger.debug(response)
401
        response.ResponseHeader.ServiceResult.check()
402 1
        for sid in subscriptionids:
403 1
            self._publishcallbacks.pop(sid)
404 1
        resp_fut.set_result(response.Results)
405 1
406 1
    def publish(self, acks=None):
407 1
        self.logger.info("publish")
408 1
        if acks is None:
409
            acks = []
410
        request = ua.PublishRequest()
411
        request.Parameters.SubscriptionAcknowledgements = acks
412
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
413 1
414
    def _call_publish_callback(self, future):
415
        self.logger.info("call_publish_callback")
416 1
        data = future.result()
417 1
418 1
        # check if answer looks ok
419
        try:
420
            self._uasocket.check_answer(data, "during waiting for publish response")
421
        except BadNoSubscription:
422 1
            # NOTE: check_answer() logs the answer even though we catch the exception
423 1
424 1
            self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
425 1
            # Spec Part 5, Paragraph 13.8.1
426 1
            # BadNoSubscription is expected after deleting the last subscription.
427 1
            #
428 1
            # We should therefore also check for len(self._publishcallbacks) == 0, but
429 1
            # this gets us into trouble if a Publish response arrives before the
430 1
            # DeleteSubscription response.
431
            #
432 1
            # We could remove the callback already when sending the DeleteSubscription request,
433 1
            # but there are some legitimate reasons to keep them around, such as when the server
434 1
            # responds with "BadTimeout" and we should try again later instead of just removing
435 1
            # the subscription client-side.
436 1
            #
437 1
            # There are a variety of ways to act correctly, but the most practical solution seems
438 1
            # to be to just ignore any BadNoSubscription responses.
439 1
            return
440 1
441
        # parse publish response
442 1
        try:
443 1
            response = ua.PublishResponse.from_binary(data)
444 1
            self.logger.debug(response)
445 1
        except Exception:
446 1
            self.logger.exception("Error parsing notificatipn from server")
447 1
            self.publish([]) #send publish request ot server so he does stop sending notifications
448 1
            return
449 1
450 1
        # look for callback
451
        try:
452 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
453 1
        except KeyError:
454 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
455 1
            return
456 1
457 1
        # do callback
458 1
        try:
459 1
            callback(response.Parameters)
460 1
        except Exception:  # we call client code, catch everything!
461
            self.logger.exception("Exception while calling user callback: %s")
462 1
463 1
    def create_monitored_items(self, params):
464 1
        self.logger.info("create_monitored_items")
465 1
        request = ua.CreateMonitoredItemsRequest()
466 1
        request.Parameters = params
467 1
        data = self._uasocket.send_request(request)
468 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
469 1
        self.logger.debug(response)
470
        response.ResponseHeader.ServiceResult.check()
471 1
        return response.Results
472
473
    def delete_monitored_items(self, params):
474
        self.logger.info("delete_monitored_items")
475
        request = ua.DeleteMonitoredItemsRequest()
476
        request.Parameters = params
477
        data = self._uasocket.send_request(request)
478
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
479
        self.logger.debug(response)
480
        response.ResponseHeader.ServiceResult.check()
481
        return response.Results
482
483
    def add_nodes(self, nodestoadd):
484
        self.logger.info("add_nodes")
485
        request = ua.AddNodesRequest()
486
        request.Parameters.NodesToAdd = nodestoadd
487
        data = self._uasocket.send_request(request)
488
        response = ua.AddNodesResponse.from_binary(data)
489
        self.logger.debug(response)
490
        response.ResponseHeader.ServiceResult.check()
491
        return response.Results
492
493
    def delete_nodes(self, nodestodelete):
494
        self.logger.info("delete_nodes")
495
        request = ua.DeleteNodesRequest()
496
        request.Parameters.NodesToDelete = nodestodelete
497
        data = self._uasocket.send_request(request)
498
        response = ua.DeleteNodesResponse.from_binary(data)
499
        self.logger.debug(response)
500
        response.ResponseHeader.ServiceResult.check()
501
        return response.Results
502
503
    def call(self, methodstocall):
504
        request = ua.CallRequest()
505
        request.Parameters.MethodsToCall = methodstocall
506
        data = self._uasocket.send_request(request)
507
        response = ua.CallResponse.from_binary(data)
508
        self.logger.debug(response)
509
        response.ResponseHeader.ServiceResult.check()
510
        return response.Results
511
512
    def history_read(self, params):
513
        self.logger.info("history_read")
514
        request = ua.HistoryReadRequest()
515
        request.Parameters = params
516
        data = self._uasocket.send_request(request)
517
        response = ua.HistoryReadResponse.from_binary(data)
518
        self.logger.debug(response)
519
        response.ResponseHeader.ServiceResult.check()
520
        return response.Results
521
522
    def modify_monitored_items(self, params):
523
        self.logger.info("modify_monitored_items")
524
        request = ua.ModifyMonitoredItemsRequest()
525
        request.Parameters = params
526
        data = self._uasocket.send_request(request)
527
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
528
        self.logger.debug(response)
529
        response.ResponseHeader.ServiceResult.check()
530
        return response.Results
531