Completed
Push — master ( a5d685...6787aa )
by Olivier
02:20
created

opcua.client.UaClient.connect_socket()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1
Metric Value
cc 1
dl 0
loc 6
ccs 3
cts 3
cp 1
crap 1
rs 9.4285
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14
15 1
class UASocketClient(object):
16
    """
17
    handle socket connection and send ua messages
18
    timeout is the timeout used while waiting for an ua answer from server
19
    """
20 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
21 1
        self.logger = logging.getLogger(__name__ + "Socket")
22 1
        self._thread = None
23 1
        self._lock = Lock()
24 1
        self.timeout = timeout
25 1
        self._socket = None
26 1
        self._do_stop = False
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._connection = ua.SecureConnection(security_policy)
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.start()
41
42 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
43
        """
44
        send request to server, lower-level method
45
        timeout is the timeout written in ua header
46
        returns future
47
        """
48 1
        with self._lock:
49 1
            request.RequestHeader = self._create_request_header(timeout)
50 1
            try:
51 1
                binreq = request.to_binary()
52
            except:
53
                # reset reqeust handle if any error
54
                # see self._create_request_header
55
                self._request_handle -= 1
56
                raise
57 1
            self._request_id += 1
58 1
            future = Future()
59 1
            if callback:
60 1
                future.add_done_callback(callback)
61 1
            self._callbackmap[self._request_id] = future
62 1
            msg = self._connection.message_to_binary(binreq, message_type, self._request_id)
63 1
            self._socket.write(msg)
64 1
        return future
65
66 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
67
        """
68
        send request to server.
69
        timeout is the timeout written in ua header
70
        returns response object if no callback is provided
71
        """
72 1
        future = self._send_request(request, callback, timeout, message_type)
73 1
        if not callback:
74 1
            data = future.result(self.timeout)
75 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
76 1
            return data
77
78 1
    def check_answer(self, data, context):
79 1
        data = data.copy()
80 1
        typeid = ua.NodeId.from_binary(data)
81 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
82 1
            self.logger.warning("ServiceFault from server received %s", context)
83 1
            hdr = ua.ResponseHeader.from_binary(data)
84 1
            hdr.ServiceResult.check()
85
            return False
86 1
        return True
87
88 1
    def _run(self):
89 1
        self.logger.info("Thread started")
90 1
        while not self._do_stop:
91 1
            try:
92 1
                self._receive()
93 1
            except ua.utils.SocketClosedException:
94 1
                self.logger.info("Socket has closed connection")
95 1
                break
96 1
        self.logger.info("Thread ended")
97
98 1
    def _receive(self):
99 1
        msg = self._connection.receive_from_socket(self._socket)
100 1
        if msg is None:
101
            return
102 1
        elif isinstance(msg, ua.Message):
103 1
            self._call_callback(msg.request_id(), msg.body())
104 1
        elif isinstance(msg, ua.Acknowledge):
105 1
            self._call_callback(0, msg)
106
        elif isinstance(msg, ua.ErrorMessage):
107
            self.logger.warning("Received an error: {}".format(msg))
108
        else:
109
            raise ua.UaError("Unsupported message type: {}".format(msg))
110
111 1
    def _call_callback(self, request_id, body):
112 1
        with self._lock:
113 1
            future = self._callbackmap.pop(request_id, None)
114 1
            if future is None:
115
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
116 1
        future.set_result(body)
117
118 1
    def _create_request_header(self, timeout=1000):
119 1
        hdr = ua.RequestHeader()
120 1
        hdr.AuthenticationToken = self.authentication_token
121 1
        self._request_handle += 1
122 1
        hdr.RequestHandle = self._request_handle
123 1
        hdr.TimeoutHint = timeout
124 1
        return hdr
125
126 1
    def connect_socket(self, host, port):
127
        """
128
        connect to server socket and start receiving thread
129
        """
130 1
        self.logger.info("opening connection")
131 1
        sock = socket.create_connection((host, port))
132 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
133 1
        self._socket = utils.SocketWrapper(sock)
134 1
        self.start()
135
136 1
    def disconnect_socket(self):
137 1
        self.logger.info("stop request")
138 1
        self._do_stop = True
139 1
        self._socket.socket.shutdown(socket.SHUT_WR)
140 1
        self._socket.socket.close()
141
142 1
    def send_hello(self, url):
143 1
        hello = ua.Hello()
144 1
        hello.EndpointUrl = url
145 1
        future = Future()
146 1
        with self._lock:
147 1
            self._callbackmap[0] = future
148 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
149 1
        self._socket.write(binmsg)
150 1
        ack = future.result(self.timeout)
151 1
        return ack
152
153 1
    def open_secure_channel(self, params):
154 1
        self.logger.info("open_secure_channel")
155 1
        request = ua.OpenSecureChannelRequest()
156 1
        request.Parameters = params
157 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
158
159 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
160 1
        response.ResponseHeader.ServiceResult.check()
161 1
        self._connection.set_security_token(response.Parameters.SecurityToken)
162 1
        return response.Parameters
163
164 1
    def close_secure_channel(self):
165
        """
166
        close secure channel. It seems to trigger a shutdown of socket
167
        in most servers, so be prepare to reconnect.
168
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
169
        """
170 1
        self.logger.info("close_secure_channel")
171 1
        request = ua.CloseSecureChannelRequest()
172 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
173 1
        with self._lock:
174
            # don't expect any more answers
175 1
            future.cancel()
176 1
            self._callbackmap.clear()
177
178
        # some servers send a response here, most do not ... so we ignore
179
180
181 1
class UaClient(object):
182
183
    """
184
    low level OPC-UA client.
185
186
    It implements (almost) all methods defined in opcua spec
187
    taking in argument the structures defined in opcua spec.
188
189
    In this Python implementation  most of the structures are defined in
190
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
191
    """
192
193 1
    def __init__(self, timeout=1):
194 1
        self.logger = logging.getLogger(__name__)
195
        # _publishcallbacks should be accessed in recv thread only
196 1
        self._publishcallbacks = {}
197 1
        self._timeout = timeout
198 1
        self._uasocket = None
199 1
        self._security_policy = ua.SecurityPolicy()
200
201 1
    def set_security(self, policy):
202
        self._security_policy = policy
203
204 1
    def connect_socket(self, host, port):
205
        """
206
        connect to server socket and start receiving thread
207
        """
208 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
209 1
        return self._uasocket.connect_socket(host, port)
210
211 1
    def disconnect_socket(self):
212 1
        return self._uasocket.disconnect_socket()
213
214 1
    def send_hello(self, url):
215 1
        return self._uasocket.send_hello(url)
216
217 1
    def open_secure_channel(self, params):
218 1
        return self._uasocket.open_secure_channel(params)
219
220 1
    def close_secure_channel(self):
221
        """
222
        close secure channel. It seems to trigger a shutdown of socket
223
        in most servers, so be prepare to reconnect
224
        """
225 1
        return self._uasocket.close_secure_channel()
226
227 1
    def create_session(self, parameters):
228 1
        self.logger.info("create_session")
229 1
        request = ua.CreateSessionRequest()
230 1
        request.Parameters = parameters
231 1
        data = self._uasocket.send_request(request)
232 1
        response = ua.CreateSessionResponse.from_binary(data)
233 1
        response.ResponseHeader.ServiceResult.check()
234 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
235 1
        return response.Parameters
236
237 1
    def activate_session(self, parameters):
238 1
        self.logger.info("activate_session")
239 1
        request = ua.ActivateSessionRequest()
240 1
        request.Parameters = parameters
241 1
        data = self._uasocket.send_request(request)
242 1
        response = ua.ActivateSessionResponse.from_binary(data)
243 1
        response.ResponseHeader.ServiceResult.check()
244 1
        return response.Parameters
245
246 1
    def close_session(self, deletesubscriptions):
247 1
        self.logger.info("close_session")
248 1
        request = ua.CloseSessionRequest()
249 1
        request.DeleteSubscriptions = deletesubscriptions
250 1
        data = self._uasocket.send_request(request)
251 1
        ua.CloseSessionResponse.from_binary(data)
252
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
253
254 1
    def browse(self, parameters):
255 1
        self.logger.info("browse")
256 1
        request = ua.BrowseRequest()
257 1
        request.Parameters = parameters
258 1
        data = self._uasocket.send_request(request)
259 1
        response = ua.BrowseResponse.from_binary(data)
260 1
        response.ResponseHeader.ServiceResult.check()
261 1
        return response.Results
262
263 1
    def read(self, parameters):
264 1
        self.logger.info("read")
265 1
        request = ua.ReadRequest()
266 1
        request.Parameters = parameters
267 1
        data = self._uasocket.send_request(request)
268 1
        response = ua.ReadResponse.from_binary(data)
269 1
        response.ResponseHeader.ServiceResult.check()
270
        # cast to Enum attributes that need to
271 1
        for idx, rv in enumerate(parameters.NodesToRead):
272 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
273
                dv = response.Results[idx]
274
                if dv.StatusCode.is_good():
275
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
276 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
277
                dv = response.Results[idx]
278
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
279
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
280 1
        return response.Results
281
282 1
    def write(self, params):
283 1
        self.logger.info("read")
284 1
        request = ua.WriteRequest()
285 1
        request.Parameters = params
286 1
        data = self._uasocket.send_request(request)
287 1
        response = ua.WriteResponse.from_binary(data)
288 1
        response.ResponseHeader.ServiceResult.check()
289 1
        return response.Results
290
291 1
    def get_endpoints(self, params):
292 1
        self.logger.info("get_endpoint")
293 1
        request = ua.GetEndpointsRequest()
294 1
        request.Parameters = params
295 1
        data = self._uasocket.send_request(request)
296 1
        response = ua.GetEndpointsResponse.from_binary(data)
297 1
        response.ResponseHeader.ServiceResult.check()
298 1
        return response.Endpoints
299
300 1
    def find_servers(self, params):
301 1
        self.logger.info("find_servers")
302 1
        request = ua.FindServersRequest()
303 1
        request.Parameters = params
304 1
        data = self._uasocket.send_request(request)
305 1
        response = ua.FindServersResponse.from_binary(data)
306 1
        response.ResponseHeader.ServiceResult.check()
307 1
        return response.Servers
308
309 1
    def find_servers_on_network(self, params):
310
        self.logger.info("find_servers_on_network")
311
        request = ua.FindServersOnNetworkRequest()
312
        request.Parameters = params
313
        data = self._uasocket.send_request(request)
314
        response = ua.FindServersOnNetworkResponse.from_binary(data)
315
        response.ResponseHeader.ServiceResult.check()
316
        return response.Parameters
317
318 1
    def register_server(self, registered_server):
319 1
        self.logger.info("register_server")
320 1
        request = ua.RegisterServerRequest()
321 1
        request.Server = registered_server
322 1
        data = self._uasocket.send_request(request)
323 1
        response = ua.RegisterServerResponse.from_binary(data)
324 1
        response.ResponseHeader.ServiceResult.check()
325
        # nothing to return for this service
326
327 1
    def register_server2(self, params):
328
        self.logger.info("register_server2")
329
        request = ua.RegisterServer2Request()
330
        request.Parameters = params
331
        data = self._uasocket.send_request(request)
332
        response = ua.RegisterServer2Response.from_binary(data)
333
        response.ResponseHeader.ServiceResult.check()
334
        return response.ConfigurationResults
335
336 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
337 1
        self.logger.info("translate_browsepath_to_nodeid")
338 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
339 1
        request.Parameters.BrowsePaths = browsepaths
340 1
        data = self._uasocket.send_request(request)
341 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
342 1
        response.ResponseHeader.ServiceResult.check()
343 1
        return response.Results
344
345 1
    def create_subscription(self, params, callback):
346 1
        self.logger.info("create_subscription")
347 1
        request = ua.CreateSubscriptionRequest()
348 1
        request.Parameters = params
349 1
        resp_fut = Future()
350 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
351 1
        self._uasocket.send_request(request, mycallbak)
352 1
        return resp_fut.result(self._timeout)
353
354 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
355 1
        self.logger.info("_create_subscription_callback")
356 1
        data = data_fut.result()
357 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
358 1
        response.ResponseHeader.ServiceResult.check()
359 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
360 1
        resp_fut.set_result(response.Parameters)
361
362 1
    def delete_subscriptions(self, subscriptionids):
363 1
        self.logger.info("delete_subscription")
364 1
        request = ua.DeleteSubscriptionsRequest()
365 1
        request.Parameters.SubscriptionIds = subscriptionids
366 1
        resp_fut = Future()
367 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
368 1
        self._uasocket.send_request(request, mycallbak)
369 1
        return resp_fut.result(self._timeout)
370
371 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
372 1
        self.logger.info("_delete_subscriptions_callback")
373 1
        data = data_fut.result()
374 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
375 1
        response.ResponseHeader.ServiceResult.check()
376 1
        for sid in subscriptionids:
377 1
            self._publishcallbacks.pop(sid)
378 1
        resp_fut.set_result(response.Results)
379
380 1
    def publish(self, acks=None):
381 1
        self.logger.info("publish")
382 1
        if acks is None:
383 1
            acks = []
384 1
        request = ua.PublishRequest()
385 1
        request.Parameters.SubscriptionAcknowledgements = acks
386 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
387
388 1
    def _call_publish_callback(self, future):
389 1
        self.logger.info("call_publish_callback")
390 1
        data = future.result()
391 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
392 1
        try:
393 1
            response = ua.PublishResponse.from_binary(data)
394
        except Exception:
395
            self.logger.exception("Error parsing notificatipn from server")
396
            self.publish([]) #send publish request ot server so he does stop sending notifications
397
            return
398 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
399
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
400
            return
401 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
402 1
        try:
403 1
            callback(response.Parameters)
404
        except Exception:  # we call client code, catch everything!
405
            self.logger.exception("Exception while calling user callback: %s")
406
407 1
    def create_monitored_items(self, params):
408 1
        self.logger.info("create_monitored_items")
409 1
        request = ua.CreateMonitoredItemsRequest()
410 1
        request.Parameters = params
411 1
        data = self._uasocket.send_request(request)
412 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
413 1
        response.ResponseHeader.ServiceResult.check()
414 1
        return response.Results
415
416 1
    def delete_monitored_items(self, params):
417 1
        self.logger.info("delete_monitored_items")
418 1
        request = ua.DeleteMonitoredItemsRequest()
419 1
        request.Parameters = params
420 1
        data = self._uasocket.send_request(request)
421 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
422 1
        response.ResponseHeader.ServiceResult.check()
423 1
        return response.Results
424
425 1
    def add_nodes(self, nodestoadd):
426 1
        self.logger.info("add_nodes")
427 1
        request = ua.AddNodesRequest()
428 1
        request.Parameters.NodesToAdd = nodestoadd
429 1
        data = self._uasocket.send_request(request)
430 1
        response = ua.AddNodesResponse.from_binary(data)
431 1
        response.ResponseHeader.ServiceResult.check()
432 1
        return response.Results
433
434 1
    def delete_nodes(self, nodestodelete):
435 1
        self.logger.info("delete_nodes")
436 1
        request = ua.DeleteNodesRequest()
437 1
        request.Parameters.NodesToDelete = nodestodelete
438 1
        data = self._uasocket.send_request(request)
439 1
        response = ua.DeleteNodesResponse.from_binary(data)
440 1
        response.ResponseHeader.ServiceResult.check()
441 1
        return response.Results
442
443 1
    def call(self, methodstocall):
444 1
        request = ua.CallRequest()
445 1
        request.Parameters.MethodsToCall = methodstocall
446 1
        data = self._uasocket.send_request(request)
447 1
        response = ua.CallResponse.from_binary(data)
448 1
        response.ResponseHeader.ServiceResult.check()
449 1
        return response.Results
450
451 1
    def history_read(self, params):
452
        self.logger.info("history_read")
453
        request = ua.HistoryReadRequest()
454
        request.Parameters = params
455
        data = self._uasocket.send_request(request)
456
        response = ua.HistoryReadResponse.from_binary(data)
457
        response.ResponseHeader.ServiceResult.check()
458
        return response.Results
459