Completed
Push — master ( 5a9df5...39e3a6 )
by Olivier
223:12 queued 212:11
created

opcua.client.BinaryClient.__init__()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1
Metric Value
cc 1
dl 0
loc 7
ccs 6
cts 6
cp 1
crap 1
rs 9.4286
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14 1
class UASocketClient(object):
15
    """
16
    handle socket connection and send ua messages
17
    timeout is the timeout used while waiting for an ua answer from server
18
    """
19 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
20 1
        self.logger = logging.getLogger(__name__ + "Socket")
21 1
        self._thread = None
22 1
        self._lock = Lock()
23 1
        self.timeout = timeout
24 1
        self._socket = None
25 1
        self._do_stop = False
26 1
        self._security_token = ua.ChannelSecurityToken()
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._sequence_number = 0
29 1
        self._request_id = 0
30 1
        self._request_handle = 0
31 1
        self._callbackmap = {}
32 1
        self._security_policy = security_policy
33 1
        self._max_chunk_size = 65536
34
35 1
    def start(self):
36
        """
37
        Start receiving thread.
38
        this is called automatically in connect and
39
        should not be necessary to call directly
40
        """
41 1
        self._thread = Thread(target=self._run)
42 1
        self._thread.start()
43
44 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
45
        """
46
        send request to server, lower-level method
47
        timeout is the timeout written in ua header
48
        returns future
49
        """
50 1
        with self._lock:
51 1
            request.RequestHeader = self._create_request_header(timeout)
52 1
            try:
53 1
                binreq = request.to_binary()
54
            except:
55
                # reset reqeust handle if any error
56
                # see self._create_request_header
57
                self._request_handle -= 1
58
                raise
59 1
            self._request_id += 1
60 1
            future = Future()
61 1
            if callback:
62 1
                future.add_done_callback(callback)
63 1
            self._callbackmap[self._request_id] = future
64 1
            for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, binreq, self._max_chunk_size,
65
                    message_type=message_type,
66
                    channel_id=self._security_token.ChannelId,
67
                    request_id=self._request_id,
68
                    token_id=self._security_token.TokenId):
69 1
                self._sequence_number += 1
70 1
                chunk.SequenceHeader.SequenceNumber = self._sequence_number
71 1
                self._socket.write(chunk.to_binary())
72 1
        return future
73
74 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
75
        """
76
        send request to server.
77
        timeout is the timeout written in ua header
78
        returns response object if no callback is provided
79
        """
80 1
        future = self._send_request(request, callback, timeout, message_type)
81 1
        if not callback:
82 1
            data = future.result(self.timeout)
83 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
84 1
            return data
85
86 1
    def check_answer(self, data, context):
87 1
        data = data.copy()
88 1
        typeid = ua.NodeId.from_binary(data)
89 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
90
            self.logger.warning("ServiceFault from server received %s", context)
91
            hdr = ua.ResponseHeader.from_binary(data)
92
            hdr.ServiceResult.check()
93
            return False
94 1
        return True
95
96 1
    def _run(self):
97 1
        self.logger.info("Thread started")
98 1
        while not self._do_stop:
99 1
            try:
100 1
                self._receive()
101 1
            except ua.utils.SocketClosedException:
102 1
                self.logger.info("Socket has closed connection")
103 1
                break
104 1
        self.logger.info("Thread ended")
105
106 1
    def _receive(self):
107 1
        msg = ua.tcp_message_from_socket(self._security_policy, self._socket)
108 1
        if isinstance(msg, ua.MessageChunk):
109 1
            chunks = [msg]
110
            # TODO: check everything
111 1
            while chunks[-1].MessageHeader.ChunkType == ua.ChunkType.Intermediate:
112
                chunks.append(ua.tcp_message_from_socket(self._security_policy, self._socket))
113 1
            body = b"".join([c.Body for c in chunks])
114 1
            self._call_callback(msg.SequenceHeader.RequestId, utils.Buffer(body))
115 1
        elif isinstance(msg, ua.Acknowledge):
116 1
            self._call_callback(0, msg)
117
        elif isinstance(msg, ua.ErrorMessage):
118
            self.logger.warning("Received an error: {}".format(msg))
119
        else:
120
            raise Exception("Unsupported message type: {}".format(msg))
121
122 1
    def _call_callback(self, request_id, body):
123 1
        with self._lock:
124 1
            future = self._callbackmap.pop(request_id, None)
125 1
            if future is None:
126 1
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
127 1
        future.set_result(body)
128
129 1
    def _write_socket(self, hdr, *args):
130 1
        alle = []
131 1
        for arg in args:
132 1
            data = arg.to_binary()
133 1
            hdr.add_size(len(data))
134 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
135 1
            self.logger.debug("struct: %s", arg)
136 1
            self.logger.debug("data: %s", data)
137 1
            alle.append(data)
138 1
        alle.insert(0, hdr.to_binary())
139 1
        alle = b"".join(alle)
140 1
        self._socket.write(alle)
141
142 1
    def _create_request_header(self, timeout=1000):
143 1
        hdr = ua.RequestHeader()
144 1
        hdr.AuthenticationToken = self.authentication_token
145 1
        self._request_handle += 1
146 1
        hdr.RequestHandle = self._request_handle
147 1
        hdr.TimeoutHint = timeout
148 1
        return hdr
149
150 1
    def connect_socket(self, host, port):
151
        """
152
        connect to server socket and start receiving thread
153
        """
154 1
        self.logger.info("opening connection")
155 1
        sock = socket.create_connection((host, port))
156 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
157 1
        self._socket = utils.SocketWrapper(sock)
158 1
        self.start()
159
160 1
    def disconnect_socket(self):
161 1
        self.logger.info("stop request")
162 1
        self._do_stop = True
163 1
        self._socket.socket.shutdown(socket.SHUT_WR)
164
165 1
    def send_hello(self, url):
166 1
        hello = ua.Hello()
167 1
        hello.EndpointUrl = url
168 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
169 1
        future = Future()
170 1
        with self._lock:
171 1
            self._callbackmap[0] = future
172 1
        self._write_socket(header, hello)
173 1
        ack = future.result(self.timeout)
174 1
        self._max_chunk_size = ack.SendBufferSize	# client shouldn't send chunks larger than this
175 1
        return ack
176
177 1
    def open_secure_channel(self, params):
178 1
        self.logger.info("open_secure_channel")
179 1
        request = ua.OpenSecureChannelRequest()
180 1
        request.Parameters = params
181 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
182
183 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
184 1
        response.ResponseHeader.ServiceResult.check()
185 1
        self._security_token = response.Parameters.SecurityToken
186 1
        return response.Parameters
187
188 1
    def close_secure_channel(self):
189
        """
190
        close secure channel. It seems to trigger a shutdown of socket
191
        in most servers, so be prepare to reconnect.
192
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
193
        """
194 1
        self.logger.info("close_secure_channel")
195 1
        request = ua.CloseSecureChannelRequest()
196 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
197 1
        with self._lock:
198
            # don't expect any more answers
199 1
            future.cancel()
200 1
            self._callbackmap.clear()
201
202
        # some servers send a response here, most do not ... so we ignore
203
204
205
206 1
class BinaryClient(object):
207
208
    """
209
    low level OPC-UA client.
210
    implement all(well..one day) methods defined in opcua spec
211
    taking in argument the structures defined in opcua spec
212
    in python most of the structures are defined in
213
    uaprotocol_auto.py and uaprotocol_hand.py
214
    """
215
216 1
    def __init__(self, timeout=1):
217 1
        self.logger = logging.getLogger(__name__)
218
        # _publishcallbacks should be accessed in recv thread only
219 1
        self._publishcallbacks = {}
220 1
        self._timeout = timeout
221 1
        self._uasocket = None
222 1
        self._security_policy = ua.SecurityPolicy()
223
224 1
    def set_security(self, policy):
225
        self._security_policy = policy
226
227 1
    def connect_socket(self, host, port):
228
        """
229
        connect to server socket and start receiving thread
230
        """
231 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
232 1
        return self._uasocket.connect_socket(host, port)
233
234 1
    def disconnect_socket(self):
235 1
        return self._uasocket.disconnect_socket()
236
237 1
    def send_hello(self, url):
238 1
        return self._uasocket.send_hello(url)
239
240 1
    def open_secure_channel(self, params):
241 1
        return self._uasocket.open_secure_channel(params)
242
243 1
    def close_secure_channel(self):
244
        """
245
        close secure channel. It seems to trigger a shutdown of socket
246
        in most servers, so be prepare to reconnect
247
        """
248 1
        return self._uasocket.close_secure_channel()
249
250 1
    def create_session(self, parameters):
251 1
        self.logger.info("create_session")
252 1
        request = ua.CreateSessionRequest()
253 1
        request.Parameters = parameters
254 1
        data = self._uasocket.send_request(request)
255 1
        response = ua.CreateSessionResponse.from_binary(data)
256 1
        response.ResponseHeader.ServiceResult.check()
257 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
258 1
        return response.Parameters
259
260 1
    def activate_session(self, parameters):
261 1
        self.logger.info("activate_session")
262 1
        request = ua.ActivateSessionRequest()
263 1
        request.Parameters = parameters
264 1
        data = self._uasocket.send_request(request)
265 1
        response = ua.ActivateSessionResponse.from_binary(data)
266 1
        response.ResponseHeader.ServiceResult.check()
267 1
        return response.Parameters
268
269 1
    def close_session(self, deletesubscriptions):
270 1
        self.logger.info("close_session")
271 1
        request = ua.CloseSessionRequest()
272 1
        request.DeleteSubscriptions = deletesubscriptions
273 1
        data = self._uasocket.send_request(request)
274 1
        ua.CloseSessionResponse.from_binary(data)
275
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
276
277 1
    def browse(self, parameters):
278 1
        self.logger.info("browse")
279 1
        request = ua.BrowseRequest()
280 1
        request.Parameters = parameters
281 1
        data = self._uasocket.send_request(request)
282 1
        response = ua.BrowseResponse.from_binary(data)
283 1
        response.ResponseHeader.ServiceResult.check()
284 1
        return response.Results
285
286 1
    def read(self, parameters):
287 1
        self.logger.info("read")
288 1
        request = ua.ReadRequest()
289 1
        request.Parameters = parameters
290 1
        data = self._uasocket.send_request(request)
291 1
        response = ua.ReadResponse.from_binary(data)
292 1
        response.ResponseHeader.ServiceResult.check()
293
        # cast to Enum attributes that need to
294 1
        for idx, rv in enumerate(parameters.NodesToRead):
295 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
296
                dv = response.Results[idx]
297
                if dv.StatusCode.is_good():
298
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
299 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
300
                dv = response.Results[idx]
301
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
302
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
303 1
        return response.Results
304
305 1
    def write(self, params):
306 1
        self.logger.info("read")
307 1
        request = ua.WriteRequest()
308 1
        request.Parameters = params
309 1
        data = self._uasocket.send_request(request)
310 1
        response = ua.WriteResponse.from_binary(data)
311 1
        response.ResponseHeader.ServiceResult.check()
312 1
        return response.Results
313
314 1
    def get_endpoints(self, params):
315 1
        self.logger.info("get_endpoint")
316 1
        request = ua.GetEndpointsRequest()
317 1
        request.Parameters = params
318 1
        data = self._uasocket.send_request(request)
319 1
        response = ua.GetEndpointsResponse.from_binary(data)
320 1
        response.ResponseHeader.ServiceResult.check()
321 1
        return response.Endpoints
322
323 1
    def find_servers(self, params):
324 1
        self.logger.info("find_servers")
325 1
        request = ua.FindServersRequest()
326 1
        request.Parameters = params
327 1
        data = self._uasocket.send_request(request)
328 1
        response = ua.FindServersResponse.from_binary(data)
329 1
        response.ResponseHeader.ServiceResult.check()
330 1
        return response.Servers
331
332 1
    def find_servers_on_network(self, params):
333
        self.logger.info("find_servers_on_network")
334
        request = ua.FindServersOnNetworkRequest()
335
        request.Parameters = params
336
        data = self._uasocket.send_request(request)
337
        response = ua.FindServersOnNetworkResponse.from_binary(data)
338
        response.ResponseHeader.ServiceResult.check()
339
        return response.Parameters
340
341 1
    def register_server(self, registered_server):
342 1
        self.logger.info("register_server")
343 1
        request = ua.RegisterServerRequest()
344 1
        request.Server = registered_server
345 1
        data = self._uasocket.send_request(request)
346 1
        response = ua.RegisterServerResponse.from_binary(data)
347 1
        response.ResponseHeader.ServiceResult.check()
348
        # nothing to return for this service
349
350 1
    def register_server2(self, params):
351
        self.logger.info("register_server2")
352
        request = ua.RegisterServer2Request()
353
        request.Parameters = params
354
        data = self._uasocket.send_request(request)
355
        response = ua.RegisterServer2Response.from_binary(data)
356
        response.ResponseHeader.ServiceResult.check()
357
        return response.ConfigurationResults
358
359 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
360 1
        self.logger.info("translate_browsepath_to_nodeid")
361 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
362 1
        request.Parameters.BrowsePaths = browsepaths
363 1
        data = self._uasocket.send_request(request)
364 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
365 1
        response.ResponseHeader.ServiceResult.check()
366 1
        return response.Results
367
368 1
    def create_subscription(self, params, callback):
369 1
        self.logger.info("create_subscription")
370 1
        request = ua.CreateSubscriptionRequest()
371 1
        request.Parameters = params
372 1
        resp_fut = Future()
373 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
374 1
        self._uasocket.send_request(request, mycallbak)
375 1
        return resp_fut.result(self._timeout)
376
377 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
378 1
        self.logger.info("_create_subscription_callback")
379 1
        data = data_fut.result()
380 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
381 1
        response.ResponseHeader.ServiceResult.check()
382 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
383 1
        resp_fut.set_result(response.Parameters)
384
385 1
    def delete_subscriptions(self, subscriptionids):
386 1
        self.logger.info("delete_subscription")
387 1
        request = ua.DeleteSubscriptionsRequest()
388 1
        request.Parameters.SubscriptionIds = subscriptionids
389 1
        resp_fut = Future()
390 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
391 1
        self._uasocket.send_request(request, mycallbak)
392 1
        return resp_fut.result(self._timeout)
393
394 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
395 1
        self.logger.info("_delete_subscriptions_callback")
396 1
        data = data_fut.result()
397 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
398 1
        response.ResponseHeader.ServiceResult.check()
399 1
        for sid in subscriptionids:
400 1
            self._publishcallbacks.pop(sid)
401 1
        resp_fut.set_result(response.Results)
402
403 1
    def publish(self, acks=None):
404 1
        self.logger.info("publish")
405 1
        if acks is None:
406 1
            acks = []
407 1
        request = ua.PublishRequest()
408 1
        request.Parameters.SubscriptionAcknowledgements = acks
409 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
410
411 1
    def _call_publish_callback(self, future):
412 1
        self.logger.info("call_publish_callback")
413 1
        data = future.result()
414 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
415 1
        response = ua.PublishResponse.from_binary(data)
416 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
417
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
418
            return
419 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
420 1
        try:
421 1
            callback(response.Parameters)
422
        except Exception:  # we call client code, catch everything!
423
            self.logger.exception("Exception while calling user callback: %s")
424
425 1
    def create_monitored_items(self, params):
426 1
        self.logger.info("create_monitored_items")
427 1
        request = ua.CreateMonitoredItemsRequest()
428 1
        request.Parameters = params
429 1
        data = self._uasocket.send_request(request)
430 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
431 1
        response.ResponseHeader.ServiceResult.check()
432 1
        return response.Results
433
434 1
    def delete_monitored_items(self, params):
435 1
        self.logger.info("delete_monitored_items")
436 1
        request = ua.DeleteMonitoredItemsRequest()
437 1
        request.Parameters = params
438 1
        data = self._uasocket.send_request(request)
439 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
440 1
        response.ResponseHeader.ServiceResult.check()
441 1
        return response.Results
442
443 1
    def add_nodes(self, nodestoadd):
444 1
        self.logger.info("add_nodes")
445 1
        request = ua.AddNodesRequest()
446 1
        request.Parameters.NodesToAdd = nodestoadd
447 1
        data = self._uasocket.send_request(request)
448 1
        response = ua.AddNodesResponse.from_binary(data)
449 1
        response.ResponseHeader.ServiceResult.check()
450 1
        return response.Results
451
452 1
    def call(self, methodstocall):
453 1
        request = ua.CallRequest()
454 1
        request.Parameters.MethodsToCall = methodstocall
455 1
        data = self._uasocket.send_request(request)
456 1
        response = ua.CallResponse.from_binary(data)
457 1
        response.ResponseHeader.ServiceResult.check()
458 1
        return response.Results
459
460 1
    def history_read(self, params):
461
        self.logger.info("history_read")
462
        request = ua.HistoryReadRequest()
463
        request.Parameters = params
464
        data = self._uasocket.send_request(request)
465
        response = ua.HistoryReadResponse.from_binary(data)
466
        response.ResponseHeader.ServiceResult.check()
467
        return response.Results
468