Completed
Pull Request — master (#104)
by
unknown
66:47 queued 61:24
created

opcua.client.UASocketClient._write_socket()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2
Metric Value
cc 2
dl 0
loc 12
ccs 12
cts 12
cp 1
crap 2
rs 9.4286

1 Method

Rating   Name   Duplication   Size   Complexity  
A opcua.client.UASocketClient.disconnect_socket() 0 4 1
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14 1
class UASocketClient(object):
15
    """
16
    handle socket connection and send ua messages
17
    timeout is the timeout used while waiting for an ua answer from server
18
    """
19 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
20 1
        self.logger = logging.getLogger(__name__ + "Socket")
21 1
        self._thread = None
22 1
        self._lock = Lock()
23 1
        self.timeout = timeout
24 1
        self._socket = None
25 1
        self._do_stop = False
26 1
        self.authentication_token = ua.NodeId()
27 1
        self._request_id = 0
28 1
        self._request_handle = 0
29 1
        self._callbackmap = {}
30 1
        self._connection = ua.SecureConnection(security_policy)
31 1
32 1
    def start(self):
33 1
        """
34
        Start receiving thread.
35 1
        this is called automatically in connect and
36
        should not be necessary to call directly
37
        """
38
        self._thread = Thread(target=self._run)
39
        self._thread.start()
40
41 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
42 1
        """
43
        send request to server, lower-level method
44 1
        timeout is the timeout written in ua header
45
        returns future
46
        """
47
        with self._lock:
48
            request.RequestHeader = self._create_request_header(timeout)
49
            try:
50 1
                binreq = request.to_binary()
51 1
            except:
52 1
                # reset reqeust handle if any error
53 1
                # see self._create_request_header
54
                self._request_handle -= 1
55
                raise
56
            self._request_id += 1
57
            future = Future()
58
            if callback:
59 1
                future.add_done_callback(callback)
60 1
            self._callbackmap[self._request_id] = future
61 1
            msg = self._connection.message_to_binary(binreq,
62 1
                    message_type, self._request_id)
63 1
            self._socket.write(msg)
64 1
        return future
65
66
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
67
        """
68
        send request to server.
69 1
        timeout is the timeout written in ua header
70 1
        returns response object if no callback is provided
71 1
        """
72 1
        future = self._send_request(request, callback, timeout, message_type)
73
        if not callback:
74 1
            data = future.result(self.timeout)
75
            self.check_answer(data, " in response to " + request.__class__.__name__)
76
            return data
77
78
    def check_answer(self, data, context):
79
        data = data.copy()
80 1
        typeid = ua.NodeId.from_binary(data)
81 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
82 1
            self.logger.warning("ServiceFault from server received %s", context)
83 1
            hdr = ua.ResponseHeader.from_binary(data)
84 1
            hdr.ServiceResult.check()
85
            return False
86 1
        return True
87 1
88 1
    def _run(self):
89 1
        self.logger.info("Thread started")
90
        while not self._do_stop:
91
            try:
92
                self._receive()
93
            except ua.utils.SocketClosedException:
94 1
                self.logger.info("Socket has closed connection")
95
                break
96 1
        self.logger.info("Thread ended")
97 1
98 1
    def _receive(self):
99 1
        msg = self._connection.receive_from_socket(self._socket)
100 1
        if msg is None:
101 1
            return
102 1
        elif isinstance(msg, ua.Message):
103 1
            self._call_callback(msg.request_id(), msg.body())
104 1
        elif isinstance(msg, ua.Acknowledge):
105
            self._call_callback(0, msg)
106 1
        elif isinstance(msg, ua.ErrorMessage):
107 1
            self.logger.warning("Received an error: {}".format(msg))
108 1
        else:
109 1
            raise Exception("Unsupported message type: {}".format(msg))
110
111 1
    def _call_callback(self, request_id, body):
112
        with self._lock:
113 1
            future = self._callbackmap.pop(request_id, None)
114 1
            if future is None:
115 1
                raise Exception("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
116 1
        future.set_result(body)
117
118
    def _create_request_header(self, timeout=1000):
119
        hdr = ua.RequestHeader()
120
        hdr.AuthenticationToken = self.authentication_token
121
        self._request_handle += 1
122 1
        hdr.RequestHandle = self._request_handle
123 1
        hdr.TimeoutHint = timeout
124 1
        return hdr
125 1
126 1
    def connect_socket(self, host, port):
127 1
        """
128
        connect to server socket and start receiving thread
129 1
        """
130 1
        self.logger.info("opening connection")
131 1
        sock = socket.create_connection((host, port))
132 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
133 1
        self._socket = utils.SocketWrapper(sock)
134 1
        self.start()
135 1
136 1
    def disconnect_socket(self):
137 1
        self.logger.info("stop request")
138 1
        self._do_stop = True
139 1
        self._socket.socket.shutdown(socket.SHUT_WR)
140 1
141
    def send_hello(self, url):
142 1
        hello = ua.Hello()
143 1
        hello.EndpointUrl = url
144 1
        future = Future()
145 1
        with self._lock:
146 1
            self._callbackmap[0] = future
147 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
148 1
        self._socket.write(binmsg)
149
        ack = future.result(self.timeout)
150 1
        return ack
151
152
    def open_secure_channel(self, params):
153
        self.logger.info("open_secure_channel")
154 1
        request = ua.OpenSecureChannelRequest()
155 1
        request.Parameters = params
156 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
157 1
158 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
159
        response.ResponseHeader.ServiceResult.check()
160 1
        self._connection.set_security_token(response.Parameters.SecurityToken)
161 1
        return response.Parameters
162 1
163 1
    def close_secure_channel(self):
164
        """
165 1
        close secure channel. It seems to trigger a shutdown of socket
166 1
        in most servers, so be prepare to reconnect.
167 1
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
168 1
        """
169 1
        self.logger.info("close_secure_channel")
170 1
        request = ua.CloseSecureChannelRequest()
171 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
172 1
        with self._lock:
173 1
            # don't expect any more answers
174 1
            future.cancel()
175 1
            self._callbackmap.clear()
176
177 1
        # some servers send a response here, most do not ... so we ignore
178 1
179 1
180 1
181 1
class BinaryClient(object):
182
183 1
    """
184 1
    low level OPC-UA client.
185 1
    implement all(well..one day) methods defined in opcua spec
186 1
    taking in argument the structures defined in opcua spec
187
    in python most of the structures are defined in
188 1
    uaprotocol_auto.py and uaprotocol_hand.py
189
    """
190
191
    def __init__(self, timeout=1):
192
        self.logger = logging.getLogger(__name__)
193
        # _publishcallbacks should be accessed in recv thread only
194 1
        self._publishcallbacks = {}
195 1
        self._timeout = timeout
196 1
        self._uasocket = None
197 1
        self._security_policy = ua.SecurityPolicy()
198
199 1
    def set_security(self, policy):
200 1
        self._security_policy = policy
201
202
    def connect_socket(self, host, port):
203
        """
204
        connect to server socket and start receiving thread
205
        """
206 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
207
        return self._uasocket.connect_socket(host, port)
208
209
    def disconnect_socket(self):
210
        return self._uasocket.disconnect_socket()
211
212
    def send_hello(self, url):
213
        return self._uasocket.send_hello(url)
214
215
    def open_secure_channel(self, params):
216 1
        return self._uasocket.open_secure_channel(params)
217 1
218
    def close_secure_channel(self):
219 1
        """
220 1
        close secure channel. It seems to trigger a shutdown of socket
221 1
        in most servers, so be prepare to reconnect
222 1
        """
223
        return self._uasocket.close_secure_channel()
224 1
225
    def create_session(self, parameters):
226
        self.logger.info("create_session")
227 1
        request = ua.CreateSessionRequest()
228
        request.Parameters = parameters
229
        data = self._uasocket.send_request(request)
230
        response = ua.CreateSessionResponse.from_binary(data)
231 1
        response.ResponseHeader.ServiceResult.check()
232 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
233
        return response.Parameters
234 1
235 1
    def activate_session(self, parameters):
236
        self.logger.info("activate_session")
237 1
        request = ua.ActivateSessionRequest()
238 1
        request.Parameters = parameters
239
        data = self._uasocket.send_request(request)
240 1
        response = ua.ActivateSessionResponse.from_binary(data)
241 1
        response.ResponseHeader.ServiceResult.check()
242
        return response.Parameters
243 1
244
    def close_session(self, deletesubscriptions):
245
        self.logger.info("close_session")
246
        request = ua.CloseSessionRequest()
247
        request.DeleteSubscriptions = deletesubscriptions
248 1
        data = self._uasocket.send_request(request)
249
        ua.CloseSessionResponse.from_binary(data)
250 1
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
251 1
252 1
    def browse(self, parameters):
253 1
        self.logger.info("browse")
254 1
        request = ua.BrowseRequest()
255 1
        request.Parameters = parameters
256 1
        data = self._uasocket.send_request(request)
257 1
        response = ua.BrowseResponse.from_binary(data)
258 1
        response.ResponseHeader.ServiceResult.check()
259
        return response.Results
260 1
261 1
    def read(self, parameters):
262 1
        self.logger.info("read")
263 1
        request = ua.ReadRequest()
264 1
        request.Parameters = parameters
265 1
        data = self._uasocket.send_request(request)
266 1
        response = ua.ReadResponse.from_binary(data)
267 1
        response.ResponseHeader.ServiceResult.check()
268
        # cast to Enum attributes that need to
269 1
        for idx, rv in enumerate(parameters.NodesToRead):
270 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
271 1
                dv = response.Results[idx]
272 1
                if dv.StatusCode.is_good():
273 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
274 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
275
                dv = response.Results[idx]
276
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
277 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
278 1
        return response.Results
279 1
280 1
    def write(self, params):
281 1
        self.logger.info("read")
282 1
        request = ua.WriteRequest()
283 1
        request.Parameters = params
284 1
        data = self._uasocket.send_request(request)
285
        response = ua.WriteResponse.from_binary(data)
286 1
        response.ResponseHeader.ServiceResult.check()
287 1
        return response.Results
288 1
289 1
    def get_endpoints(self, params):
290 1
        self.logger.info("get_endpoint")
291 1
        request = ua.GetEndpointsRequest()
292 1
        request.Parameters = params
293
        data = self._uasocket.send_request(request)
294 1
        response = ua.GetEndpointsResponse.from_binary(data)
295 1
        response.ResponseHeader.ServiceResult.check()
296
        return response.Endpoints
297
298
    def find_servers(self, params):
299 1
        self.logger.info("find_servers")
300
        request = ua.FindServersRequest()
301
        request.Parameters = params
302
        data = self._uasocket.send_request(request)
303 1
        response = ua.FindServersResponse.from_binary(data)
304
        response.ResponseHeader.ServiceResult.check()
305 1
        return response.Servers
306 1
307 1
    def find_servers_on_network(self, params):
308 1
        self.logger.info("find_servers_on_network")
309 1
        request = ua.FindServersOnNetworkRequest()
310 1
        request.Parameters = params
311 1
        data = self._uasocket.send_request(request)
312 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
313
        response.ResponseHeader.ServiceResult.check()
314 1
        return response.Parameters
315 1
316 1
    def register_server(self, registered_server):
317 1
        self.logger.info("register_server")
318 1
        request = ua.RegisterServerRequest()
319 1
        request.Server = registered_server
320 1
        data = self._uasocket.send_request(request)
321 1
        response = ua.RegisterServerResponse.from_binary(data)
322
        response.ResponseHeader.ServiceResult.check()
323 1
        # nothing to return for this service
324 1
325 1
    def register_server2(self, params):
326 1
        self.logger.info("register_server2")
327 1
        request = ua.RegisterServer2Request()
328 1
        request.Parameters = params
329 1
        data = self._uasocket.send_request(request)
330 1
        response = ua.RegisterServer2Response.from_binary(data)
331
        response.ResponseHeader.ServiceResult.check()
332 1
        return response.ConfigurationResults
333
334
    def translate_browsepaths_to_nodeids(self, browsepaths):
335
        self.logger.info("translate_browsepath_to_nodeid")
336
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
337
        request.Parameters.BrowsePaths = browsepaths
338
        data = self._uasocket.send_request(request)
339
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
340
        response.ResponseHeader.ServiceResult.check()
341 1
        return response.Results
342 1
343 1
    def create_subscription(self, params, callback):
344 1
        self.logger.info("create_subscription")
345 1
        request = ua.CreateSubscriptionRequest()
346 1
        request.Parameters = params
347 1
        resp_fut = Future()
348
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
349
        self._uasocket.send_request(request, mycallbak)
350 1
        return resp_fut.result(self._timeout)
351
352
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
353
        self.logger.info("_create_subscription_callback")
354
        data = data_fut.result()
355
        response = ua.CreateSubscriptionResponse.from_binary(data)
356
        response.ResponseHeader.ServiceResult.check()
357
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
358
        resp_fut.set_result(response.Parameters)
359 1
360 1
    def delete_subscriptions(self, subscriptionids):
361 1
        self.logger.info("delete_subscription")
362 1
        request = ua.DeleteSubscriptionsRequest()
363 1
        request.Parameters.SubscriptionIds = subscriptionids
364 1
        resp_fut = Future()
365 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
366 1
        self._uasocket.send_request(request, mycallbak)
367
        return resp_fut.result(self._timeout)
368 1
369 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
370 1
        self.logger.info("_delete_subscriptions_callback")
371 1
        data = data_fut.result()
372 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
373 1
        response.ResponseHeader.ServiceResult.check()
374 1
        for sid in subscriptionids:
375 1
            self._publishcallbacks.pop(sid)
376
        resp_fut.set_result(response.Results)
377 1
378 1
    def publish(self, acks=None):
379 1
        self.logger.info("publish")
380 1
        if acks is None:
381 1
            acks = []
382 1
        request = ua.PublishRequest()
383 1
        request.Parameters.SubscriptionAcknowledgements = acks
384
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
385 1
386 1
    def _call_publish_callback(self, future):
387 1
        self.logger.info("call_publish_callback")
388 1
        data = future.result()
389 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
390 1
        response = ua.PublishResponse.from_binary(data)
391 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
392 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
393
            return
394 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
395 1
        try:
396 1
            callback(response.Parameters)
397 1
        except Exception:  # we call client code, catch everything!
398 1
            self.logger.exception("Exception while calling user callback: %s")
399 1
400 1
    def create_monitored_items(self, params):
401 1
        self.logger.info("create_monitored_items")
402
        request = ua.CreateMonitoredItemsRequest()
403 1
        request.Parameters = params
404 1
        data = self._uasocket.send_request(request)
405 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
406 1
        response.ResponseHeader.ServiceResult.check()
407 1
        return response.Results
408 1
409 1
    def delete_monitored_items(self, params):
410
        self.logger.info("delete_monitored_items")
411 1
        request = ua.DeleteMonitoredItemsRequest()
412 1
        request.Parameters = params
413 1
        data = self._uasocket.send_request(request)
414 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
415 1
        response.ResponseHeader.ServiceResult.check()
416 1
        return response.Results
417
418
    def add_nodes(self, nodestoadd):
419 1
        self.logger.info("add_nodes")
420 1
        request = ua.AddNodesRequest()
421 1
        request.Parameters.NodesToAdd = nodestoadd
422
        data = self._uasocket.send_request(request)
423
        response = ua.AddNodesResponse.from_binary(data)
424
        response.ResponseHeader.ServiceResult.check()
425 1
        return response.Results
426 1
427 1
    def call(self, methodstocall):
428 1
        request = ua.CallRequest()
429 1
        request.Parameters.MethodsToCall = methodstocall
430 1
        data = self._uasocket.send_request(request)
431 1
        response = ua.CallResponse.from_binary(data)
432 1
        response.ResponseHeader.ServiceResult.check()
433
        return response.Results
434 1
435 1
    def history_read(self, params):
436 1
        self.logger.info("history_read")
437 1
        request = ua.HistoryReadRequest()
438 1
        request.Parameters = params
439 1
        data = self._uasocket.send_request(request)
440 1
        response = ua.HistoryReadResponse.from_binary(data)
441 1
        response.ResponseHeader.ServiceResult.check()
442
        return response.Results
443