Completed
Pull Request — master (#163)
by
unknown
02:50
created

UASocketClient.send_hello()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2
Metric Value
cc 2
dl 0
loc 10
rs 9.4285
ccs 10
cts 10
cp 1
crap 2
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14
15 1
class UASocketClient(object):
16
    """
17
    handle socket connection and send ua messages
18
    timeout is the timeout used while waiting for an ua answer from server
19
    """
20 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
21 1
        self.logger = logging.getLogger(__name__ + ".Socket")
22 1
        self._thread = None
23 1
        self._lock = Lock()
24 1
        self.timeout = timeout
25 1
        self._socket = None
26 1
        self._do_stop = False
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._connection = ua.SecureConnection(security_policy)
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.daemon = True
41 1
        self._thread.start()
42
43 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
44
        """
45
        send request to server, lower-level method
46
        timeout is the timeout written in ua header
47
        returns future
48
        """
49 1
        with self._lock:
50 1
            request.RequestHeader = self._create_request_header(timeout)
51 1
            self.logger.debug("Sending: %s", request)
52 1
            try:
53 1
                binreq = request.to_binary()
54
            except:
55
                # reset reqeust handle if any error
56
                # see self._create_request_header
57
                self._request_handle -= 1
58
                raise
59 1
            self._request_id += 1
60 1
            future = Future()
61 1
            if callback:
62 1
                future.add_done_callback(callback)
63 1
            self._callbackmap[self._request_id] = future
64 1
            msg = self._connection.message_to_binary(binreq, message_type, self._request_id)
65 1
            self._socket.write(msg)
66 1
        return future
67
68 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
69
        """
70
        send request to server.
71
        timeout is the timeout written in ua header
72
        returns response object if no callback is provided
73
        """
74 1
        future = self._send_request(request, callback, timeout, message_type)
75 1
        if not callback:
76 1
            data = future.result(self.timeout)
77 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
78 1
            return data
79
80 1
    def check_answer(self, data, context):
81 1
        data = data.copy()
82 1
        typeid = ua.NodeId.from_binary(data)
83 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
84 1
            self.logger.warning("ServiceFault from server received %s", context)
85 1
            hdr = ua.ResponseHeader.from_binary(data)
86 1
            hdr.ServiceResult.check()
87
            return False
88 1
        return True
89
90 1
    def _run(self):
91 1
        self.logger.info("Thread started")
92 1
        while not self._do_stop:
93 1
            try:
94 1
                self._receive()
95 1
            except ua.utils.SocketClosedException:
96 1
                self.logger.info("Socket has closed connection")
97 1
                break
98 1
        self.logger.info("Thread ended")
99
100 1
    def _receive(self):
101 1
        msg = self._connection.receive_from_socket(self._socket)
102 1
        if msg is None:
103
            return
104 1
        elif isinstance(msg, ua.Message):
105 1
            self._call_callback(msg.request_id(), msg.body())
106 1
        elif isinstance(msg, ua.Acknowledge):
107 1
            self._call_callback(0, msg)
108
        elif isinstance(msg, ua.ErrorMessage):
109
            self.logger.warning("Received an error: %s", msg)
110
        else:
111
            raise ua.UaError("Unsupported message type: %s", msg)
112
113 1
    def _call_callback(self, request_id, body):
114 1
        with self._lock:
115 1
            future = self._callbackmap.pop(request_id, None)
116 1
            if future is None:
117
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
118 1
        future.set_result(body)
119
120 1
    def _create_request_header(self, timeout=1000):
121 1
        hdr = ua.RequestHeader()
122 1
        hdr.AuthenticationToken = self.authentication_token
123 1
        self._request_handle += 1
124 1
        hdr.RequestHandle = self._request_handle
125 1
        hdr.TimeoutHint = timeout
126 1
        return hdr
127
128 1
    def connect_socket(self, host, port):
129
        """
130
        connect to server socket and start receiving thread
131
        """
132 1
        self.logger.info("opening connection")
133 1
        sock = socket.create_connection((host, port))
134 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
135 1
        self._socket = utils.SocketWrapper(sock)
136 1
        self.start()
137
138 1
    def disconnect_socket(self):
139 1
        self.logger.info("stop request")
140 1
        self._do_stop = True
141 1
        self._socket.socket.shutdown(socket.SHUT_WR)
142 1
        self._socket.socket.close()
143
144 1
    def send_hello(self, url):
145 1
        hello = ua.Hello()
146 1
        hello.EndpointUrl = url
147 1
        future = Future()
148 1
        with self._lock:
149 1
            self._callbackmap[0] = future
150 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
151 1
        self._socket.write(binmsg)
152 1
        ack = future.result(self.timeout)
153 1
        return ack
154
155 1
    def open_secure_channel(self, params):
156 1
        self.logger.info("open_secure_channel")
157 1
        request = ua.OpenSecureChannelRequest()
158 1
        request.Parameters = params
159 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
160
161 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
162 1
        response.ResponseHeader.ServiceResult.check()
163 1
        self._connection.set_security_token(response.Parameters.SecurityToken)
164 1
        return response.Parameters
165
166 1
    def close_secure_channel(self):
167
        """
168
        close secure channel. It seems to trigger a shutdown of socket
169
        in most servers, so be prepare to reconnect.
170
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
171
        """
172 1
        self.logger.info("close_secure_channel")
173 1
        request = ua.CloseSecureChannelRequest()
174 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
175 1
        with self._lock:
176
            # don't expect any more answers
177 1
            future.cancel()
178 1
            self._callbackmap.clear()
179
180
        # some servers send a response here, most do not ... so we ignore
181
182
183 1
class UaClient(object):
184
185
    """
186
    low level OPC-UA client.
187
188
    It implements (almost) all methods defined in opcua spec
189
    taking in argument the structures defined in opcua spec.
190
191
    In this Python implementation  most of the structures are defined in
192
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
193
    """
194
195 1
    def __init__(self, timeout=1):
196 1
        self.logger = logging.getLogger(__name__)
197
        # _publishcallbacks should be accessed in recv thread only
198 1
        self._publishcallbacks = {}
199 1
        self._timeout = timeout
200 1
        self._uasocket = None
201 1
        self._security_policy = ua.SecurityPolicy()
202
203 1
    def set_security(self, policy):
204
        self._security_policy = policy
205
206 1
    def connect_socket(self, host, port):
207
        """
208
        connect to server socket and start receiving thread
209
        """
210 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
211 1
        return self._uasocket.connect_socket(host, port)
212
213 1
    def disconnect_socket(self):
214 1
        return self._uasocket.disconnect_socket()
215
216 1
    def send_hello(self, url):
217 1
        return self._uasocket.send_hello(url)
218
219 1
    def open_secure_channel(self, params):
220 1
        return self._uasocket.open_secure_channel(params)
221
222 1
    def close_secure_channel(self):
223
        """
224
        close secure channel. It seems to trigger a shutdown of socket
225
        in most servers, so be prepare to reconnect
226
        """
227 1
        return self._uasocket.close_secure_channel()
228
229 1
    def create_session(self, parameters):
230 1
        self.logger.info("create_session")
231 1
        request = ua.CreateSessionRequest()
232 1
        request.Parameters = parameters
233 1
        data = self._uasocket.send_request(request)
234 1
        response = ua.CreateSessionResponse.from_binary(data)
235 1
        self.logger.debug(response)
236 1
        response.ResponseHeader.ServiceResult.check()
237 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
238 1
        return response.Parameters
239
240 1
    def activate_session(self, parameters):
241 1
        self.logger.info("activate_session")
242 1
        request = ua.ActivateSessionRequest()
243 1
        request.Parameters = parameters
244 1
        data = self._uasocket.send_request(request)
245 1
        response = ua.ActivateSessionResponse.from_binary(data)
246 1
        self.logger.debug(response)
247 1
        response.ResponseHeader.ServiceResult.check()
248 1
        return response.Parameters
249
250 1
    def close_session(self, deletesubscriptions):
251 1
        self.logger.info("close_session")
252 1
        request = ua.CloseSessionRequest()
253 1
        request.DeleteSubscriptions = deletesubscriptions
254 1
        data = self._uasocket.send_request(request)
255 1
        ua.CloseSessionResponse.from_binary(data)
256
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
257
258 1
    def browse(self, parameters):
259 1
        self.logger.info("browse")
260 1
        request = ua.BrowseRequest()
261 1
        request.Parameters = parameters
262 1
        data = self._uasocket.send_request(request)
263 1
        response = ua.BrowseResponse.from_binary(data)
264 1
        self.logger.debug(response)
265 1
        response.ResponseHeader.ServiceResult.check()
266 1
        return response.Results
267
268 1
    def read(self, parameters):
269 1
        self.logger.info("read")
270 1
        request = ua.ReadRequest()
271 1
        request.Parameters = parameters
272 1
        data = self._uasocket.send_request(request)
273 1
        response = ua.ReadResponse.from_binary(data)
274 1
        self.logger.debug(response)
275 1
        response.ResponseHeader.ServiceResult.check()
276
        # cast to Enum attributes that need to
277 1
        for idx, rv in enumerate(parameters.NodesToRead):
278 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
279
                dv = response.Results[idx]
280
                if dv.StatusCode.is_good():
281
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
282 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
283 1
                dv = response.Results[idx]
284 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
285 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
286 1
        return response.Results
287
288 1
    def write(self, params):
289 1
        self.logger.info("read")
290 1
        request = ua.WriteRequest()
291 1
        request.Parameters = params
292 1
        data = self._uasocket.send_request(request)
293 1
        response = ua.WriteResponse.from_binary(data)
294 1
        self.logger.debug(response)
295 1
        response.ResponseHeader.ServiceResult.check()
296 1
        return response.Results
297
298 1
    def get_endpoints(self, params):
299 1
        self.logger.info("get_endpoint")
300 1
        request = ua.GetEndpointsRequest()
301 1
        request.Parameters = params
302 1
        data = self._uasocket.send_request(request)
303 1
        response = ua.GetEndpointsResponse.from_binary(data)
304 1
        self.logger.debug(response)
305 1
        response.ResponseHeader.ServiceResult.check()
306 1
        return response.Endpoints
307
308 1
    def find_servers(self, params):
309 1
        self.logger.info("find_servers")
310 1
        request = ua.FindServersRequest()
311 1
        request.Parameters = params
312 1
        data = self._uasocket.send_request(request)
313 1
        response = ua.FindServersResponse.from_binary(data)
314 1
        self.logger.debug(response)
315 1
        response.ResponseHeader.ServiceResult.check()
316 1
        return response.Servers
317
318 1
    def find_servers_on_network(self, params):
319
        self.logger.info("find_servers_on_network")
320
        request = ua.FindServersOnNetworkRequest()
321
        request.Parameters = params
322
        data = self._uasocket.send_request(request)
323
        response = ua.FindServersOnNetworkResponse.from_binary(data)
324
        self.logger.debug(response)
325
        response.ResponseHeader.ServiceResult.check()
326
        return response.Parameters
327
328 1
    def register_server(self, registered_server):
329 1
        self.logger.info("register_server")
330 1
        request = ua.RegisterServerRequest()
331 1
        request.Server = registered_server
332 1
        data = self._uasocket.send_request(request)
333 1
        response = ua.RegisterServerResponse.from_binary(data)
334 1
        self.logger.debug(response)
335 1
        response.ResponseHeader.ServiceResult.check()
336
        # nothing to return for this service
337
338 1
    def register_server2(self, params):
339
        self.logger.info("register_server2")
340
        request = ua.RegisterServer2Request()
341
        request.Parameters = params
342
        data = self._uasocket.send_request(request)
343
        response = ua.RegisterServer2Response.from_binary(data)
344
        self.logger.debug(response)
345
        response.ResponseHeader.ServiceResult.check()
346
        return response.ConfigurationResults
347
348 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
349 1
        self.logger.info("translate_browsepath_to_nodeid")
350 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
351 1
        request.Parameters.BrowsePaths = browsepaths
352 1
        data = self._uasocket.send_request(request)
353 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
354 1
        self.logger.debug(response)
355 1
        response.ResponseHeader.ServiceResult.check()
356 1
        return response.Results
357
358 1
    def create_subscription(self, params, callback):
359 1
        self.logger.info("create_subscription")
360 1
        request = ua.CreateSubscriptionRequest()
361 1
        request.Parameters = params
362 1
        resp_fut = Future()
363 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
364 1
        self._uasocket.send_request(request, mycallbak)
365 1
        return resp_fut.result(self._timeout)
366
367 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
368 1
        self.logger.info("_create_subscription_callback")
369 1
        data = data_fut.result()
370 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
371 1
        self.logger.debug(response)
372 1
        response.ResponseHeader.ServiceResult.check()
373 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
374 1
        resp_fut.set_result(response.Parameters)
375
376 1
    def delete_subscriptions(self, subscriptionids):
377 1
        self.logger.info("delete_subscription")
378 1
        request = ua.DeleteSubscriptionsRequest()
379 1
        request.Parameters.SubscriptionIds = subscriptionids
380 1
        resp_fut = Future()
381 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
382 1
        self._uasocket.send_request(request, mycallbak)
383 1
        return resp_fut.result(self._timeout)
384
385 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
386 1
        self.logger.info("_delete_subscriptions_callback")
387 1
        data = data_fut.result()
388 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
389 1
        self.logger.debug(response)
390 1
        response.ResponseHeader.ServiceResult.check()
391 1
        for sid in subscriptionids:
392 1
            self._publishcallbacks.pop(sid)
393 1
        resp_fut.set_result(response.Results)
394
395 1
    def publish(self, acks=None):
396 1
        self.logger.info("publish")
397 1
        if acks is None:
398 1
            acks = []
399 1
        request = ua.PublishRequest()
400 1
        request.Parameters.SubscriptionAcknowledgements = acks
401 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
402
403 1
    def _call_publish_callback(self, future):
404 1
        self.logger.info("call_publish_callback")
405 1
        data = future.result()
406 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
407 1
        try:
408 1
            response = ua.PublishResponse.from_binary(data)
409 1
            self.logger.debug(response)
410
        except Exception:
411
            self.logger.exception("Error parsing notificatipn from server")
412
            self.publish([]) #send publish request ot server so he does stop sending notifications
413
            return
414 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
415
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
416
            return
417 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
418 1
        try:
419 1
            callback(response.Parameters)
420
        except Exception:  # we call client code, catch everything!
421
            self.logger.exception("Exception while calling user callback: %s")
422
423 1
    def create_monitored_items(self, params):
424 1
        self.logger.info("create_monitored_items")
425 1
        request = ua.CreateMonitoredItemsRequest()
426 1
        request.Parameters = params
427 1
        data = self._uasocket.send_request(request)
428 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
429 1
        self.logger.debug(response)
430 1
        response.ResponseHeader.ServiceResult.check()
431 1
        return response.Results
432
433 1
    def delete_monitored_items(self, params):
434 1
        self.logger.info("delete_monitored_items")
435 1
        request = ua.DeleteMonitoredItemsRequest()
436 1
        request.Parameters = params
437 1
        data = self._uasocket.send_request(request)
438 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
439 1
        self.logger.debug(response)
440 1
        response.ResponseHeader.ServiceResult.check()
441 1
        return response.Results
442
443 1
    def add_nodes(self, nodestoadd):
444 1
        self.logger.info("add_nodes")
445 1
        request = ua.AddNodesRequest()
446 1
        request.Parameters.NodesToAdd = nodestoadd
447 1
        data = self._uasocket.send_request(request)
448 1
        response = ua.AddNodesResponse.from_binary(data)
449 1
        self.logger.debug(response)
450 1
        response.ResponseHeader.ServiceResult.check()
451 1
        return response.Results
452
453 1
    def delete_nodes(self, nodestodelete):
454 1
        self.logger.info("delete_nodes")
455 1
        request = ua.DeleteNodesRequest()
456 1
        request.Parameters.NodesToDelete = nodestodelete
457 1
        data = self._uasocket.send_request(request)
458 1
        response = ua.DeleteNodesResponse.from_binary(data)
459 1
        self.logger.debug(response)
460 1
        response.ResponseHeader.ServiceResult.check()
461 1
        return response.Results
462
463 1
    def call(self, methodstocall):
464 1
        request = ua.CallRequest()
465 1
        request.Parameters.MethodsToCall = methodstocall
466 1
        data = self._uasocket.send_request(request)
467 1
        response = ua.CallResponse.from_binary(data)
468 1
        self.logger.debug(response)
469 1
        response.ResponseHeader.ServiceResult.check()
470 1
        return response.Results
471
472 1
    def history_read(self, params):
473
        self.logger.info("history_read")
474
        request = ua.HistoryReadRequest()
475
        request.Parameters = params
476
        data = self._uasocket.send_request(request)
477
        response = ua.HistoryReadResponse.from_binary(data)
478
        self.logger.debug(response)
479
        response.ResponseHeader.ServiceResult.check()
480
        return response.Results
481