Completed
Pull Request — master (#133)
by Denis
02:23
created

opcua.client.UaClient.publish()   A

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.5185
Metric Value
cc 2
dl 0
loc 7
ccs 1
cts 7
cp 0.1429
crap 4.5185
rs 9.4285
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14
15 1
class UASocketClient(object):
16
    """
17
    handle socket connection and send ua messages
18
    timeout is the timeout used while waiting for an ua answer from server
19
    """
20 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
21 1
        self.logger = logging.getLogger(__name__ + "Socket")
22 1
        self._thread = None
23 1
        self._lock = Lock()
24 1
        self.timeout = timeout
25 1
        self._socket = None
26 1
        self._do_stop = False
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._connection = ua.SecureConnection(security_policy)
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.start()
41
42 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
43
        """
44
        send request to server, lower-level method
45
        timeout is the timeout written in ua header
46
        returns future
47
        """
48 1
        with self._lock:
49 1
            request.RequestHeader = self._create_request_header(timeout)
50 1
            self.logger.debug("Sending: %s", request)
51 1
            try:
52
                binreq = request.to_binary()
53
            except:
54
                # reset reqeust handle if any error
55
                # see self._create_request_header
56
                self._request_handle -= 1
57 1
                raise
58 1
            self._request_id += 1
59 1
            future = Future()
60 1
            if callback:
61 1
                future.add_done_callback(callback)
62 1
            self._callbackmap[self._request_id] = future
63 1
            msg = self._connection.message_to_binary(binreq, message_type, self._request_id)
64 1
            self._socket.write(msg)
65
        return future
66 1
67
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
68
        """
69
        send request to server.
70
        timeout is the timeout written in ua header
71
        returns response object if no callback is provided
72 1
        """
73 1
        future = self._send_request(request, callback, timeout, message_type)
74 1
        if not callback:
75 1
            data = future.result(self.timeout)
76 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
77
            return data
78 1
79 1
    def check_answer(self, data, context):
80 1
        data = data.copy()
81 1
        typeid = ua.NodeId.from_binary(data)
82 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
83 1
            self.logger.warning("ServiceFault from server received %s", context)
84 1
            hdr = ua.ResponseHeader.from_binary(data)
85
            hdr.ServiceResult.check()
86 1
            return False
87
        return True
88 1
89 1
    def _run(self):
90 1
        self.logger.info("Thread started")
91 1
        while not self._do_stop:
92 1
            try:
93 1
                self._receive()
94 1
            except ua.utils.SocketClosedException:
95 1
                self.logger.info("Socket has closed connection")
96 1
                break
97
        self.logger.info("Thread ended")
98 1
99 1
    def _receive(self):
100 1
        msg = self._connection.receive_from_socket(self._socket)
101
        if msg is None:
102 1
            return
103 1
        elif isinstance(msg, ua.Message):
104 1
            self._call_callback(msg.request_id(), msg.body())
105 1
        elif isinstance(msg, ua.Acknowledge):
106
            self._call_callback(0, msg)
107
        elif isinstance(msg, ua.ErrorMessage):
108
            self.logger.warning("Received an error: %s", msg)
109
        else:
110
            raise ua.UaError("Unsupported message type: %s", msg)
111 1
112 1
    def _call_callback(self, request_id, body):
113 1
        with self._lock:
114 1
            future = self._callbackmap.pop(request_id, None)
115
            if future is None:
116 1
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
117
        future.set_result(body)
118 1
119 1
    def _create_request_header(self, timeout=1000):
120 1
        hdr = ua.RequestHeader()
121 1
        hdr.AuthenticationToken = self.authentication_token
122 1
        self._request_handle += 1
123 1
        hdr.RequestHandle = self._request_handle
124 1
        hdr.TimeoutHint = timeout
125
        return hdr
126 1
127
    def connect_socket(self, host, port):
128
        """
129
        connect to server socket and start receiving thread
130 1
        """
131 1
        self.logger.info("opening connection")
132 1
        sock = socket.create_connection((host, port))
133 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
134 1
        self._socket = utils.SocketWrapper(sock)
135
        self.start()
136 1
137 1
    def disconnect_socket(self):
138 1
        self.logger.info("stop request")
139 1
        self._do_stop = True
140 1
        self._socket.socket.shutdown(socket.SHUT_WR)
141
        self._socket.socket.close()
142 1
143 1
    def send_hello(self, url):
144 1
        hello = ua.Hello()
145 1
        hello.EndpointUrl = url
146 1
        future = Future()
147 1
        with self._lock:
148 1
            self._callbackmap[0] = future
149 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
150 1
        self._socket.write(binmsg)
151 1
        ack = future.result(self.timeout)
152
        return ack
153 1
154 1
    def open_secure_channel(self, params):
155 1
        self.logger.info("open_secure_channel")
156 1
        request = ua.OpenSecureChannelRequest()
157 1
        request.Parameters = params
158
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
159 1
160 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
161 1
        response.ResponseHeader.ServiceResult.check()
162 1
        self._connection.set_security_token(response.Parameters.SecurityToken)
163
        return response.Parameters
164 1
165
    def close_secure_channel(self):
166
        """
167
        close secure channel. It seems to trigger a shutdown of socket
168
        in most servers, so be prepare to reconnect.
169
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
170 1
        """
171 1
        self.logger.info("close_secure_channel")
172 1
        request = ua.CloseSecureChannelRequest()
173 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
174
        with self._lock:
175 1
            # don't expect any more answers
176 1
            future.cancel()
177
            self._callbackmap.clear()
178
179
        # some servers send a response here, most do not ... so we ignore
180
181 1
182
class UaClient(object):
183
184
    """
185
    low level OPC-UA client.
186
187
    It implements (almost) all methods defined in opcua spec
188
    taking in argument the structures defined in opcua spec.
189
190
    In this Python implementation  most of the structures are defined in
191
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
192
    """
193 1
194 1
    def __init__(self, timeout=1):
195
        self.logger = logging.getLogger(__name__)
196 1
        # _publishcallbacks should be accessed in recv thread only
197 1
        self._publishcallbacks = {}
198 1
        self._timeout = timeout
199 1
        self._uasocket = None
200
        self._security_policy = ua.SecurityPolicy()
201 1
202
    def set_security(self, policy):
203
        self._security_policy = policy
204 1
205
    def connect_socket(self, host, port):
206
        """
207
        connect to server socket and start receiving thread
208 1
        """
209 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
210
        return self._uasocket.connect_socket(host, port)
211 1
212 1
    def disconnect_socket(self):
213
        return self._uasocket.disconnect_socket()
214 1
215 1
    def send_hello(self, url):
216
        return self._uasocket.send_hello(url)
217 1
218 1
    def open_secure_channel(self, params):
219
        return self._uasocket.open_secure_channel(params)
220 1
221
    def close_secure_channel(self):
222
        """
223
        close secure channel. It seems to trigger a shutdown of socket
224
        in most servers, so be prepare to reconnect
225 1
        """
226
        return self._uasocket.close_secure_channel()
227 1
228 1
    def create_session(self, parameters):
229 1
        self.logger.info("create_session")
230 1
        request = ua.CreateSessionRequest()
231 1
        request.Parameters = parameters
232 1
        data = self._uasocket.send_request(request)
233 1
        response = ua.CreateSessionResponse.from_binary(data)
234 1
        self.logger.debug(response)
235 1
        response.ResponseHeader.ServiceResult.check()
236
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
237 1
        return response.Parameters
238 1
239 1
    def activate_session(self, parameters):
240 1
        self.logger.info("activate_session")
241 1
        request = ua.ActivateSessionRequest()
242 1
        request.Parameters = parameters
243 1
        data = self._uasocket.send_request(request)
244 1
        response = ua.ActivateSessionResponse.from_binary(data)
245
        self.logger.debug(response)
246 1
        response.ResponseHeader.ServiceResult.check()
247 1
        return response.Parameters
248 1
249 1
    def close_session(self, deletesubscriptions):
250 1
        self.logger.info("close_session")
251 1
        request = ua.CloseSessionRequest()
252
        request.DeleteSubscriptions = deletesubscriptions
253
        data = self._uasocket.send_request(request)
254 1
        ua.CloseSessionResponse.from_binary(data)
255 1
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
256 1
257 1
    def browse(self, parameters):
258 1
        self.logger.info("browse")
259 1
        request = ua.BrowseRequest()
260 1
        request.Parameters = parameters
261 1
        data = self._uasocket.send_request(request)
262
        response = ua.BrowseResponse.from_binary(data)
263 1
        self.logger.debug(response)
264 1
        response.ResponseHeader.ServiceResult.check()
265 1
        return response.Results
266 1
267 1
    def read(self, parameters):
268 1
        self.logger.info("read")
269 1
        request = ua.ReadRequest()
270
        request.Parameters = parameters
271 1
        data = self._uasocket.send_request(request)
272 1
        response = ua.ReadResponse.from_binary(data)
273
        self.logger.debug(response)
274
        response.ResponseHeader.ServiceResult.check()
275
        # cast to Enum attributes that need to
276 1
        for idx, rv in enumerate(parameters.NodesToRead):
277
            if rv.AttributeId == ua.AttributeIds.NodeClass:
278
                dv = response.Results[idx]
279
                if dv.StatusCode.is_good():
280 1
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
281
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
282 1
                dv = response.Results[idx]
283 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
284 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
285 1
        return response.Results
286 1
287 1
    def write(self, params):
288 1
        self.logger.info("read")
289 1
        request = ua.WriteRequest()
290
        request.Parameters = params
291 1
        data = self._uasocket.send_request(request)
292 1
        response = ua.WriteResponse.from_binary(data)
293 1
        self.logger.debug(response)
294 1
        response.ResponseHeader.ServiceResult.check()
295 1
        return response.Results
296 1
297 1
    def get_endpoints(self, params):
298 1
        self.logger.info("get_endpoint")
299
        request = ua.GetEndpointsRequest()
300 1
        request.Parameters = params
301 1
        data = self._uasocket.send_request(request)
302 1
        response = ua.GetEndpointsResponse.from_binary(data)
303 1
        self.logger.debug(response)
304 1
        response.ResponseHeader.ServiceResult.check()
305 1
        return response.Endpoints
306 1
307 1
    def find_servers(self, params):
308
        self.logger.info("find_servers")
309 1
        request = ua.FindServersRequest()
310
        request.Parameters = params
311
        data = self._uasocket.send_request(request)
312
        response = ua.FindServersResponse.from_binary(data)
313
        self.logger.debug(response)
314
        response.ResponseHeader.ServiceResult.check()
315
        return response.Servers
316
317
    def find_servers_on_network(self, params):
318 1
        self.logger.info("find_servers_on_network")
319 1
        request = ua.FindServersOnNetworkRequest()
320 1
        request.Parameters = params
321 1
        data = self._uasocket.send_request(request)
322 1
        response = ua.FindServersOnNetworkResponse.from_binary(data)
323 1
        self.logger.debug(response)
324 1
        response.ResponseHeader.ServiceResult.check()
325
        return response.Parameters
326
327 1
    def register_server(self, registered_server):
328
        self.logger.info("register_server")
329
        request = ua.RegisterServerRequest()
330
        request.Server = registered_server
331
        data = self._uasocket.send_request(request)
332
        response = ua.RegisterServerResponse.from_binary(data)
333
        self.logger.debug(response)
334
        response.ResponseHeader.ServiceResult.check()
335
        # nothing to return for this service
336 1
337 1
    def register_server2(self, params):
338 1
        self.logger.info("register_server2")
339 1
        request = ua.RegisterServer2Request()
340 1
        request.Parameters = params
341 1
        data = self._uasocket.send_request(request)
342 1
        response = ua.RegisterServer2Response.from_binary(data)
343 1
        self.logger.debug(response)
344
        response.ResponseHeader.ServiceResult.check()
345 1
        return response.ConfigurationResults
346 1
347 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
348 1
        self.logger.info("translate_browsepath_to_nodeid")
349 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
350 1
        request.Parameters.BrowsePaths = browsepaths
351 1
        data = self._uasocket.send_request(request)
352 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
353
        self.logger.debug(response)
354 1
        response.ResponseHeader.ServiceResult.check()
355 1
        return response.Results
356 1
357 1
    def create_subscription(self, params, callback):
358 1
        self.logger.info("create_subscription")
359 1
        request = ua.CreateSubscriptionRequest()
360 1
        request.Parameters = params
361
        resp_fut = Future()
362 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
363 1
        self._uasocket.send_request(request, mycallbak)
364 1
        return resp_fut.result(self._timeout)
365 1
366 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
367 1
        self.logger.info("_create_subscription_callback")
368 1
        data = data_fut.result()
369 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
370
        self.logger.debug(response)
371 1
        response.ResponseHeader.ServiceResult.check()
372 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
373 1
        resp_fut.set_result(response.Parameters)
374 1
375 1
    def delete_subscriptions(self, subscriptionids):
376 1
        self.logger.info("delete_subscription")
377 1
        request = ua.DeleteSubscriptionsRequest()
378 1
        request.Parameters.SubscriptionIds = subscriptionids
379
        resp_fut = Future()
380 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
381 1
        self._uasocket.send_request(request, mycallbak)
382 1
        return resp_fut.result(self._timeout)
383 1
384 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
385 1
        self.logger.info("_delete_subscriptions_callback")
386 1
        data = data_fut.result()
387
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
388 1
        self.logger.debug(response)
389 1
        response.ResponseHeader.ServiceResult.check()
390 1
        for sid in subscriptionids:
391 1
            self._publishcallbacks.pop(sid)
392 1
        resp_fut.set_result(response.Results)
393 1
394
    def publish(self, acks=None):
395
        self.logger.info("publish")
396
        if acks is None:
397
            acks = []
398 1
        request = ua.PublishRequest()
399
        request.Parameters.SubscriptionAcknowledgements = acks
400
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
401 1
402 1
    def _call_publish_callback(self, future):
403 1
        self.logger.info("call_publish_callback")
404
        data = future.result()
405
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
406
        try:
407 1
            response = ua.PublishResponse.from_binary(data)
408 1
            self.logger.debug(response)
409 1
        except Exception:
410 1
            self.logger.exception("Error parsing notificatipn from server")
411 1
            self.publish([]) #send publish request ot server so he does stop sending notifications
412 1
            return
413 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
414 1
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
415
            return
416 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
417 1
        try:
418 1
            callback(response.Parameters)
419 1
        except Exception:  # we call client code, catch everything!
420 1
            self.logger.exception("Exception while calling user callback: %s")
421 1
422 1
    def create_monitored_items(self, params):
423 1
        self.logger.info("create_monitored_items")
424
        request = ua.CreateMonitoredItemsRequest()
425 1
        request.Parameters = params
426 1
        data = self._uasocket.send_request(request)
427 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
428 1
        self.logger.debug(response)
429 1
        response.ResponseHeader.ServiceResult.check()
430 1
        return response.Results
431 1
432 1
    def delete_monitored_items(self, params):
433
        self.logger.info("delete_monitored_items")
434 1
        request = ua.DeleteMonitoredItemsRequest()
435 1
        request.Parameters = params
436 1
        data = self._uasocket.send_request(request)
437 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
438 1
        self.logger.debug(response)
439 1
        response.ResponseHeader.ServiceResult.check()
440 1
        return response.Results
441 1
442
    def add_nodes(self, nodestoadd):
443 1
        self.logger.info("add_nodes")
444 1
        request = ua.AddNodesRequest()
445 1
        request.Parameters.NodesToAdd = nodestoadd
446 1
        data = self._uasocket.send_request(request)
447 1
        response = ua.AddNodesResponse.from_binary(data)
448 1
        self.logger.debug(response)
449 1
        response.ResponseHeader.ServiceResult.check()
450
        return response.Results
451 1
452
    def delete_nodes(self, nodestodelete):
453
        self.logger.info("delete_nodes")
454
        request = ua.DeleteNodesRequest()
455
        request.Parameters.NodesToDelete = nodestodelete
456
        data = self._uasocket.send_request(request)
457
        response = ua.DeleteNodesResponse.from_binary(data)
458
        self.logger.debug(response)
459
        response.ResponseHeader.ServiceResult.check()
460
        return response.Results
461
462
    def call(self, methodstocall):
463
        request = ua.CallRequest()
464
        request.Parameters.MethodsToCall = methodstocall
465
        data = self._uasocket.send_request(request)
466
        response = ua.CallResponse.from_binary(data)
467
        self.logger.debug(response)
468
        response.ResponseHeader.ServiceResult.check()
469
        return response.Results
470
471
    def history_read(self, params):
472
        self.logger.info("history_read")
473
        request = ua.HistoryReadRequest()
474
        request.Parameters = params
475
        data = self._uasocket.send_request(request)
476
        response = ua.HistoryReadResponse.from_binary(data)
477
        self.logger.debug(response)
478
        response.ResponseHeader.ServiceResult.check()
479
        return response.Results
480