Passed
Push — dev ( eed7aa...454bf6 )
by Olivier
05:18 queued 02:57
created

opcua.BinaryClient.read()   B

Complexity

Conditions 7

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 9.1535
Metric Value
dl 0
loc 18
ccs 11
cts 17
cp 0.6471
rs 7.3333
cc 7
crap 9.1535
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9
10 1
import opcua.uaprotocol as ua
11 1
import opcua.utils as utils
12
13 1
class UASocketClient(object):
14
    """
15
    handle socket connection and send ua messages
16
    timeout is the timeout used while waiting for an ua answer from server
17
    """
18 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
19 1
        self.logger = logging.getLogger(__name__ + "Socket")
20 1
        self._thread = None
21 1
        self._lock = Lock()
22 1
        self.timeout = timeout
23 1
        self._socket = None
24 1
        self._do_stop = False
25 1
        self._security_token = ua.ChannelSecurityToken()
26 1
        self.authentication_token = ua.NodeId()
27 1
        self._sequence_number = 0
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._security_policy = security_policy
32 1
        self._max_chunk_size = 65536
33
34 1
    def start(self):
35
        """
36
        Start receiving thread.
37
        this is called automatically in connect and
38
        should not be necessary to call directly
39
        """
40 1
        self._thread = Thread(target=self._run)
41 1
        self._thread.start()
42
43 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            try:
52 1
                binreq = request.to_binary()
53
            except:
54
                # reset reqeust handle if any error
55
                # see self._create_request_header
56
                self._request_handle -= 1
57
                raise
58 1
            self._request_id += 1
59 1
            future = Future()
60 1
            if callback:
61 1
                future.add_done_callback(callback)
62 1
            self._callbackmap[self._request_id] = future
63 1
            for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, binreq, self._max_chunk_size,
64
                    message_type=message_type,
65
                    channel_id=self._security_token.ChannelId,
66
                    request_id=self._request_id,
67
                    token_id=self._security_token.TokenId):
68 1
                self._sequence_number += 1
69 1
                chunk.SequenceHeader.SequenceNumber = self._sequence_number
70 1
                self._socket.write(chunk.to_binary())
71 1
        return future
72
73 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
74
        """
75
        send request to server.
76
        timeout is the timeout written in ua header
77
        returns response object if no callback is provided
78
        """
79 1
        future = self._send_request(request, callback, timeout, message_type)
80 1
        if not callback:
81 1
            data = future.result(self.timeout)
82 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
83 1
            return data
84
85 1
    def check_answer(self, data, context):
86 1
        data = data.copy()
87 1
        typeid = ua.NodeId.from_binary(data)
88 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
89
            self.logger.warning("ServiceFault from server received %s", context)
90
            hdr = ua.ResponseHeader.from_binary(data)
91
            hdr.ServiceResult.check()
92
            return False
93 1
        return True
94
95 1
    def _run(self):
96 1
        self.logger.info("Thread started")
97 1
        while not self._do_stop:
98 1
            try:
99 1
                self._receive()
100 1
            except ua.utils.SocketClosedException:
101 1
                self.logger.info("Socket has closed connection")
102 1
                break
103 1
        self.logger.info("Thread ended")
104
105 1
    def _receive(self):
106 1
        msg = ua.tcp_message_from_socket(self._security_policy, self._socket)
107 1
        if isinstance(msg, ua.MessageChunk):
108 1
            chunks = [msg]
109
            # TODO: check everything
110 1
            while chunks[-1].MessageHeader.ChunkType == ua.ChunkType.Intermediate:
111
                chunks.append(ua.tcp_message_from_socket(self._security_policy, self._socket))
112 1
            body = b"".join([c.Body for c in chunks])
113 1
            self._call_callback(msg.SequenceHeader.RequestId, utils.Buffer(body))
114 1
        elif isinstance(msg, ua.Acknowledge):
115 1
            self._call_callback(0, msg)
116
        elif isinstance(msg, ua.ErrorMessage):
117
            self.logger.warning("Received an error: {}".format(msg))
118
        else:
119
            raise Exception("Unsupported message type: {}".format(msg))
120
121 1
    def _call_callback(self, request_id, body):
122 1
        with self._lock:
123 1
            future = self._callbackmap.pop(request_id, None)
124 1
            if future is None:
125 1
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
126 1
        future.set_result(body)
127
128 1
    def _write_socket(self, hdr, *args):
129 1
        alle = []
130 1
        for arg in args:
131 1
            data = arg.to_binary()
132 1
            hdr.add_size(len(data))
133 1
            self.logger.debug("writting to socket: %s with length %s ", type(arg), len(data))
134 1
            self.logger.debug("struct: %s", arg)
135 1
            self.logger.debug("data: %s", data)
136 1
            alle.append(data)
137 1
        alle.insert(0, hdr.to_binary())
138 1
        alle = b"".join(alle)
139 1
        self._socket.write(alle)
140
141 1
    def _create_request_header(self, timeout=1000):
142 1
        hdr = ua.RequestHeader()
143 1
        hdr.AuthenticationToken = self.authentication_token
144 1
        self._request_handle += 1
145 1
        hdr.RequestHandle = self._request_handle
146 1
        hdr.TimeoutHint = timeout
147 1
        return hdr
148
149 1
    def connect_socket(self, host, port):
150
        """
151
        connect to server socket and start receiving thread
152
        """
153 1
        self.logger.info("opening connection")
154 1
        sock = socket.create_connection((host, port))
155 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
156 1
        self._socket = utils.SocketWrapper(sock)
157 1
        self.start()
158
159 1
    def disconnect_socket(self):
160 1
        self.logger.info("stop request")
161 1
        self._do_stop = True
162 1
        self._socket.socket.shutdown(socket.SHUT_WR)
163
164 1
    def send_hello(self, url):
165 1
        hello = ua.Hello()
166 1
        hello.EndpointUrl = url
167 1
        header = ua.Header(ua.MessageType.Hello, ua.ChunkType.Single)
168 1
        future = Future()
169 1
        with self._lock:
170 1
            self._callbackmap[0] = future
171 1
        self._write_socket(header, hello)
172 1
        ack = future.result(self.timeout)
173 1
        self._max_chunk_size = ack.SendBufferSize	# client shouldn't send chunks larger than this
174 1
        return ack
175
176 1
    def open_secure_channel(self, params):
177 1
        self.logger.info("open_secure_channel")
178 1
        request = ua.OpenSecureChannelRequest()
179 1
        request.Parameters = params
180 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
181
182 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
183 1
        response.ResponseHeader.ServiceResult.check()
184 1
        self._security_token = response.Parameters.SecurityToken
185 1
        return response.Parameters
186
187 1
    def close_secure_channel(self):
188
        """
189
        close secure channel. It seems to trigger a shutdown of socket
190
        in most servers, so be prepare to reconnect.
191
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
192
        """
193 1
        self.logger.info("close_secure_channel")
194 1
        request = ua.CloseSecureChannelRequest()
195 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
196 1
        with self._lock:
197
            # don't expect any more answers
198 1
            future.cancel()
199 1
            self._callbackmap.clear()
200
201
        # some servers send a response here, most do not ... so we ignore
202
203
204
205 1
class BinaryClient(object):
206
207
    """
208
    low level OPC-UA client.
209
    implement all(well..one day) methods defined in opcua spec
210
    taking in argument the structures defined in opcua spec
211
    in python most of the structures are defined in
212
    uaprotocol_auto.py and uaprotocol_hand.py
213
    """
214
215 1
    def __init__(self, timeout=1):
216 1
        self.logger = logging.getLogger(__name__)
217 1
        self._publishcallbacks = {}
218 1
        self._lock = Lock()
219 1
        self._timeout = timeout
220 1
        self._uasocket = None
221 1
        self._security_policy = ua.SecurityPolicy()
222
223 1
    def set_security(self, policy):
224
        self._security_policy = policy
225
226 1
    def connect_socket(self, host, port):
227
        """
228
        connect to server socket and start receiving thread
229
        """
230 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
231 1
        return self._uasocket.connect_socket(host, port)
232
233 1
    def disconnect_socket(self):
234 1
        return self._uasocket.disconnect_socket()
235
236 1
    def send_hello(self, url):
237 1
        return self._uasocket.send_hello(url)
238
239 1
    def open_secure_channel(self, params):
240 1
        return self._uasocket.open_secure_channel(params)
241
242 1
    def close_secure_channel(self):
243
        """
244
        close secure channel. It seems to trigger a shutdown of socket
245
        in most servers, so be prepare to reconnect
246
        """
247 1
        return self._uasocket.close_secure_channel()
248
249 1
    def create_session(self, parameters):
250 1
        self.logger.info("create_session")
251 1
        request = ua.CreateSessionRequest()
252 1
        request.Parameters = parameters
253 1
        data = self._uasocket.send_request(request)
254 1
        response = ua.CreateSessionResponse.from_binary(data)
255 1
        response.ResponseHeader.ServiceResult.check()
256 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
257 1
        return response.Parameters
258
259 1
    def activate_session(self, parameters):
260 1
        self.logger.info("activate_session")
261 1
        request = ua.ActivateSessionRequest()
262 1
        request.Parameters = parameters
263 1
        data = self._uasocket.send_request(request)
264 1
        response = ua.ActivateSessionResponse.from_binary(data)
265 1
        response.ResponseHeader.ServiceResult.check()
266 1
        return response.Parameters
267
268 1
    def close_session(self, deletesubscriptions):
269 1
        self.logger.info("close_session")
270 1
        request = ua.CloseSessionRequest()
271 1
        request.DeleteSubscriptions = deletesubscriptions
272 1
        data = self._uasocket.send_request(request)
273 1
        ua.CloseSessionResponse.from_binary(data)
274
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
275
276 1
    def browse(self, parameters):
277 1
        self.logger.info("browse")
278 1
        request = ua.BrowseRequest()
279 1
        request.Parameters = parameters
280 1
        data = self._uasocket.send_request(request)
281 1
        response = ua.BrowseResponse.from_binary(data)
282 1
        response.ResponseHeader.ServiceResult.check()
283 1
        return response.Results
284
285 1
    def read(self, parameters):
286 1
        self.logger.info("read")
287 1
        request = ua.ReadRequest()
288 1
        request.Parameters = parameters
289 1
        data = self._uasocket.send_request(request)
290 1
        response = ua.ReadResponse.from_binary(data)
291 1
        response.ResponseHeader.ServiceResult.check()
292
        # cast to Enum attributes that need to
293 1
        for idx, rv in enumerate(parameters.NodesToRead):
294 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
295
                dv = response.Results[idx]
296
                if dv.StatusCode.is_good():
297
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
298 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
299
                dv = response.Results[idx]
300
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
301
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
302 1
        return response.Results
303
304 1
    def write(self, params):
305 1
        self.logger.info("read")
306 1
        request = ua.WriteRequest()
307 1
        request.Parameters = params
308 1
        data = self._uasocket.send_request(request)
309 1
        response = ua.WriteResponse.from_binary(data)
310 1
        response.ResponseHeader.ServiceResult.check()
311 1
        return response.Results
312
313 1
    def get_endpoints(self, params):
314 1
        self.logger.info("get_endpoint")
315 1
        request = ua.GetEndpointsRequest()
316 1
        request.Parameters = params
317 1
        data = self._uasocket.send_request(request)
318 1
        response = ua.GetEndpointsResponse.from_binary(data)
319 1
        response.ResponseHeader.ServiceResult.check()
320 1
        return response.Endpoints
321
322 1
    def find_servers(self, params):
323 1
        self.logger.info("find_servers")
324 1
        request = ua.FindServersRequest()
325 1
        request.Parameters = params
326 1
        data = self._uasocket.send_request(request)
327 1
        response = ua.FindServersResponse.from_binary(data)
328 1
        response.ResponseHeader.ServiceResult.check()
329 1
        return response.Servers
330
331 1
    def find_servers_on_network(self, params):
332
        self.logger.info("find_servers_on_network")
333
        request = ua.FindServersOnNetworkRequest()
334
        request.Parameters = params
335
        data = self._uasocket.send_request(request)
336
        response = ua.FindServersOnNetworkResponse.from_binary(data)
337
        response.ResponseHeader.ServiceResult.check()
338
        return response.Parameters
339
340 1
    def register_server(self, registered_server):
341 1
        self.logger.info("register_server")
342 1
        request = ua.RegisterServerRequest()
343 1
        request.Server = registered_server
344 1
        data = self._uasocket.send_request(request)
345 1
        response = ua.RegisterServerResponse.from_binary(data)
346 1
        response.ResponseHeader.ServiceResult.check()
347
        # nothing to return for this service
348
349 1
    def register_server2(self, params):
350
        self.logger.info("register_server2")
351
        request = ua.RegisterServer2Request()
352
        request.Parameters = params
353
        data = self._uasocket.send_request(request)
354
        response = ua.RegisterServer2Response.from_binary(data)
355
        response.ResponseHeader.ServiceResult.check()
356
        return response.ConfigurationResults
357
358 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
359 1
        self.logger.info("translate_browsepath_to_nodeid")
360 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
361 1
        request.Parameters.BrowsePaths = browsepaths
362 1
        data = self._uasocket.send_request(request)
363 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
364 1
        response.ResponseHeader.ServiceResult.check()
365 1
        return response.Results
366
367 1
    def create_subscription(self, params, callback):
368 1
        self.logger.info("create_subscription")
369 1
        request = ua.CreateSubscriptionRequest()
370 1
        request.Parameters = params
371 1
        data = self._uasocket.send_request(request)
372 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
373 1
        response.ResponseHeader.ServiceResult.check()
374 1
        with self._lock:
375 1
            self._publishcallbacks[response.Parameters.SubscriptionId] = callback
376 1
        return response.Parameters
377
378 1
    def delete_subscriptions(self, subscriptionids):
379 1
        self.logger.info("delete_subscription")
380 1
        request = ua.DeleteSubscriptionsRequest()
381 1
        request.Parameters.SubscriptionIds = subscriptionids
382 1
        data = self._uasocket.send_request(request)
383 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
384 1
        response.ResponseHeader.ServiceResult.check()
385 1
        for sid in subscriptionids:
386 1
            with self._lock:
387 1
                self._publishcallbacks.pop(sid)
388 1
        return response.Results
389
390 1
    def publish(self, acks=None):
391 1
        self.logger.info("publish")
392 1
        if acks is None:
393 1
            acks = []
394 1
        request = ua.PublishRequest()
395 1
        request.Parameters.SubscriptionAcknowledgements = acks
396 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
397
398 1
    def _call_publish_callback(self, future):
399 1
        self.logger.info("call_publish_callback")
400 1
        data = future.result()
401 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
402 1
        response = ua.PublishResponse.from_binary(data)
403 1
        with self._lock:
404 1
            if response.Parameters.SubscriptionId not in self._publishcallbacks:
405 1
                self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
406 1
                return
407 1
            callback = self._publishcallbacks[response.Parameters.SubscriptionId]
408 1
        try:
409 1
            callback(response.Parameters)
410
        except Exception:  # we call client code, catch everything!
411
            self.logger.exception("Exception while calling user callback: %s")
412
413 1
    def create_monitored_items(self, params):
414 1
        self.logger.info("create_monitored_items")
415 1
        request = ua.CreateMonitoredItemsRequest()
416 1
        request.Parameters = params
417 1
        data = self._uasocket.send_request(request)
418 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
419 1
        response.ResponseHeader.ServiceResult.check()
420 1
        return response.Results
421
422 1
    def delete_monitored_items(self, params):
423 1
        self.logger.info("delete_monitored_items")
424 1
        request = ua.DeleteMonitoredItemsRequest()
425 1
        request.Parameters = params
426 1
        data = self._uasocket.send_request(request)
427 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
428 1
        response.ResponseHeader.ServiceResult.check()
429 1
        return response.Results
430
431 1
    def add_nodes(self, nodestoadd):
432 1
        self.logger.info("add_nodes")
433 1
        request = ua.AddNodesRequest()
434 1
        request.Parameters.NodesToAdd = nodestoadd
435 1
        data = self._uasocket.send_request(request)
436 1
        response = ua.AddNodesResponse.from_binary(data)
437 1
        response.ResponseHeader.ServiceResult.check()
438 1
        return response.Results
439
440 1
    def call(self, methodstocall):
441 1
        request = ua.CallRequest()
442 1
        request.Parameters.MethodsToCall = methodstocall
443 1
        data = self._uasocket.send_request(request)
444 1
        response = ua.CallResponse.from_binary(data)
445 1
        response.ResponseHeader.ServiceResult.check()
446 1
        return response.Results
447
448 1
    def history_read(self, params):
449
        self.logger.info("history_read")
450
        request = ua.HistoryReadRequest()
451
        request.Parameters = params
452
        data = self._uasocket.send_request(request)
453
        response = ua.HistoryReadResponse.from_binary(data)
454
        response.ResponseHeader.ServiceResult.check()
455
        return response.Results
456