Completed
Pull Request — master (#124)
by Olivier
09:26 queued 02:10
created

opcua.client.UaClient.set_security()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
"""
2
Low level binary client
3
"""
4
5
import logging
6
import socket
7
from threading import Thread, Lock
8
from concurrent.futures import Future
9
from functools import partial
10
11
from opcua import ua
12
from opcua.common import utils
13
14
15
class UASocketClient(object):
16
    """
17
    handle socket connection and send ua messages
18
    timeout is the timeout used while waiting for an ua answer from server
19
    """
20
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
21
        self.logger = logging.getLogger(__name__ + "Socket")
22
        self._thread = None
23
        self._lock = Lock()
24
        self.timeout = timeout
25
        self._socket = None
26
        self._do_stop = False
27
        self.authentication_token = ua.NodeId()
28
        self._request_id = 0
29
        self._request_handle = 0
30
        self._callbackmap = {}
31
        self._connection = ua.SecureConnection(security_policy)
32
33
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39
        self._thread = Thread(target=self._run)
40
        self._thread.start()
41
42
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
43
        """
44
        send request to server, lower-level method
45
        timeout is the timeout written in ua header
46
        returns future
47
        """
48
        with self._lock:
49
            request.RequestHeader = self._create_request_header(timeout)
50
            try:
51
                binreq = request.to_binary()
52
            except:
53
                # reset reqeust handle if any error
54
                # see self._create_request_header
55
                self._request_handle -= 1
56
                raise
57
            self._request_id += 1
58
            future = Future()
59
            if callback:
60
                future.add_done_callback(callback)
61
            self._callbackmap[self._request_id] = future
62
            msg = self._connection.message_to_binary(binreq, message_type, self._request_id)
63
            self._socket.write(msg)
64
        return future
65
66
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
67
        """
68
        send request to server.
69
        timeout is the timeout written in ua header
70
        returns response object if no callback is provided
71
        """
72
        future = self._send_request(request, callback, timeout, message_type)
73
        if not callback:
74
            data = future.result(self.timeout)
75
            self.check_answer(data, " in response to " + request.__class__.__name__)
76
            return data
77
78
    def check_answer(self, data, context):
79
        data = data.copy()
80
        typeid = ua.NodeId.from_binary(data)
81
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
82
            self.logger.warning("ServiceFault from server received %s", context)
83
            hdr = ua.ResponseHeader.from_binary(data)
84
            hdr.ServiceResult.check()
85
            return False
86
        return True
87
88
    def _run(self):
89
        self.logger.info("Thread started")
90
        while not self._do_stop:
91
            try:
92
                self._receive()
93
            except ua.utils.SocketClosedException:
94
                self.logger.info("Socket has closed connection")
95
                break
96
        self.logger.info("Thread ended")
97
98
    def _receive(self):
99
        msg = self._connection.receive_from_socket(self._socket)
100
        if msg is None:
101
            return
102
        elif isinstance(msg, ua.Message):
103
            self._call_callback(msg.request_id(), msg.body())
104
        elif isinstance(msg, ua.Acknowledge):
105
            self._call_callback(0, msg)
106
        elif isinstance(msg, ua.ErrorMessage):
107
            self.logger.warning("Received an error: {}".format(msg))
108
        else:
109
            raise ua.UaError("Unsupported message type: {}".format(msg))
110
111
    def _call_callback(self, request_id, body):
112
        with self._lock:
113
            future = self._callbackmap.pop(request_id, None)
114
            if future is None:
115
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
116
        future.set_result(body)
117
118
    def _create_request_header(self, timeout=1000):
119
        hdr = ua.RequestHeader()
120
        hdr.AuthenticationToken = self.authentication_token
121
        self._request_handle += 1
122
        hdr.RequestHandle = self._request_handle
123
        hdr.TimeoutHint = timeout
124
        return hdr
125
126
    def connect_socket(self, host, port):
127
        """
128
        connect to server socket and start receiving thread
129
        """
130
        self.logger.info("opening connection")
131
        sock = socket.create_connection((host, port))
132
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
133
        self._socket = utils.SocketWrapper(sock)
134
        self.start()
135
136
    def disconnect_socket(self):
137
        self.logger.info("stop request")
138
        self._do_stop = True
139
        self._socket.socket.shutdown(socket.SHUT_WR)
140
        self._socket.socket.close()
141
142
    def send_hello(self, url):
143
        hello = ua.Hello()
144
        hello.EndpointUrl = url
145
        future = Future()
146
        with self._lock:
147
            self._callbackmap[0] = future
148
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
149
        self._socket.write(binmsg)
150
        ack = future.result(self.timeout)
151
        return ack
152
153
    def open_secure_channel(self, params):
154
        self.logger.info("open_secure_channel")
155
        request = ua.OpenSecureChannelRequest()
156
        request.Parameters = params
157
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
158
159
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
160
        response.ResponseHeader.ServiceResult.check()
161
        self._connection.set_security_token(response.Parameters.SecurityToken)
162
        return response.Parameters
163
164
    def close_secure_channel(self):
165
        """
166
        close secure channel. It seems to trigger a shutdown of socket
167
        in most servers, so be prepare to reconnect.
168
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
169
        """
170
        self.logger.info("close_secure_channel")
171
        request = ua.CloseSecureChannelRequest()
172
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
173
        with self._lock:
174
            # don't expect any more answers
175
            future.cancel()
176
            self._callbackmap.clear()
177
178
        # some servers send a response here, most do not ... so we ignore
179
180
181
class UaClient(object):
182
183
    """
184
    low level OPC-UA client.
185
    implement all(well..one day) methods defined in opcua spec
186
    taking in argument the structures defined in opcua spec
187
    in python most of the structures are defined in
188
    uaprotocol_auto.py and uaprotocol_hand.py
189
    """
190
191
    def __init__(self, timeout=1):
192
        self.logger = logging.getLogger(__name__)
193
        # _publishcallbacks should be accessed in recv thread only
194
        self._publishcallbacks = {}
195
        self._timeout = timeout
196
        self._uasocket = None
197
        self._security_policy = ua.SecurityPolicy()
198
199
    def set_security(self, policy):
200
        self._security_policy = policy
201
202
    def connect_socket(self, host, port):
203
        """
204
        connect to server socket and start receiving thread
205
        """
206
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
207
        return self._uasocket.connect_socket(host, port)
208
209
    def disconnect_socket(self):
210
        return self._uasocket.disconnect_socket()
211
212
    def send_hello(self, url):
213
        return self._uasocket.send_hello(url)
214
215
    def open_secure_channel(self, params):
216
        return self._uasocket.open_secure_channel(params)
217
218
    def close_secure_channel(self):
219
        """
220
        close secure channel. It seems to trigger a shutdown of socket
221
        in most servers, so be prepare to reconnect
222
        """
223
        return self._uasocket.close_secure_channel()
224
225
    def create_session(self, parameters):
226
        self.logger.info("create_session")
227
        request = ua.CreateSessionRequest()
228
        request.Parameters = parameters
229
        data = self._uasocket.send_request(request)
230
        response = ua.CreateSessionResponse.from_binary(data)
231
        response.ResponseHeader.ServiceResult.check()
232
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
233
        return response.Parameters
234
235
    def activate_session(self, parameters):
236
        self.logger.info("activate_session")
237
        request = ua.ActivateSessionRequest()
238
        request.Parameters = parameters
239
        data = self._uasocket.send_request(request)
240
        response = ua.ActivateSessionResponse.from_binary(data)
241
        response.ResponseHeader.ServiceResult.check()
242
        return response.Parameters
243
244
    def close_session(self, deletesubscriptions):
245
        self.logger.info("close_session")
246
        request = ua.CloseSessionRequest()
247
        request.DeleteSubscriptions = deletesubscriptions
248
        data = self._uasocket.send_request(request)
249
        ua.CloseSessionResponse.from_binary(data)
250
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
251
252
    def browse(self, parameters):
253
        self.logger.info("browse")
254
        request = ua.BrowseRequest()
255
        request.Parameters = parameters
256
        data = self._uasocket.send_request(request)
257
        response = ua.BrowseResponse.from_binary(data)
258
        response.ResponseHeader.ServiceResult.check()
259
        return response.Results
260
261
    def read(self, parameters):
262
        self.logger.info("read")
263
        request = ua.ReadRequest()
264
        request.Parameters = parameters
265
        data = self._uasocket.send_request(request)
266
        response = ua.ReadResponse.from_binary(data)
267
        response.ResponseHeader.ServiceResult.check()
268
        # cast to Enum attributes that need to
269
        for idx, rv in enumerate(parameters.NodesToRead):
270
            if rv.AttributeId == ua.AttributeIds.NodeClass:
271
                dv = response.Results[idx]
272
                if dv.StatusCode.is_good():
273
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
274
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
275
                dv = response.Results[idx]
276
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
277
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
278
        return response.Results
279
280
    def write(self, params):
281
        self.logger.info("read")
282
        request = ua.WriteRequest()
283
        request.Parameters = params
284
        data = self._uasocket.send_request(request)
285
        response = ua.WriteResponse.from_binary(data)
286
        response.ResponseHeader.ServiceResult.check()
287
        return response.Results
288
289
    def get_endpoints(self, params):
290
        self.logger.info("get_endpoint")
291
        request = ua.GetEndpointsRequest()
292
        request.Parameters = params
293
        data = self._uasocket.send_request(request)
294
        response = ua.GetEndpointsResponse.from_binary(data)
295
        response.ResponseHeader.ServiceResult.check()
296
        return response.Endpoints
297
298
    def find_servers(self, params):
299
        self.logger.info("find_servers")
300
        request = ua.FindServersRequest()
301
        request.Parameters = params
302
        data = self._uasocket.send_request(request)
303
        response = ua.FindServersResponse.from_binary(data)
304
        response.ResponseHeader.ServiceResult.check()
305
        return response.Servers
306
307
    def find_servers_on_network(self, params):
308
        self.logger.info("find_servers_on_network")
309
        request = ua.FindServersOnNetworkRequest()
310
        request.Parameters = params
311
        data = self._uasocket.send_request(request)
312
        response = ua.FindServersOnNetworkResponse.from_binary(data)
313
        response.ResponseHeader.ServiceResult.check()
314
        return response.Parameters
315
316
    def register_server(self, registered_server):
317
        self.logger.info("register_server")
318
        request = ua.RegisterServerRequest()
319
        request.Server = registered_server
320
        data = self._uasocket.send_request(request)
321
        response = ua.RegisterServerResponse.from_binary(data)
322
        response.ResponseHeader.ServiceResult.check()
323
        # nothing to return for this service
324
325
    def register_server2(self, params):
326
        self.logger.info("register_server2")
327
        request = ua.RegisterServer2Request()
328
        request.Parameters = params
329
        data = self._uasocket.send_request(request)
330
        response = ua.RegisterServer2Response.from_binary(data)
331
        response.ResponseHeader.ServiceResult.check()
332
        return response.ConfigurationResults
333
334
    def translate_browsepaths_to_nodeids(self, browsepaths):
335
        self.logger.info("translate_browsepath_to_nodeid")
336
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
337
        request.Parameters.BrowsePaths = browsepaths
338
        data = self._uasocket.send_request(request)
339
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
340
        response.ResponseHeader.ServiceResult.check()
341
        return response.Results
342
343
    def create_subscription(self, params, callback):
344
        self.logger.info("create_subscription")
345
        request = ua.CreateSubscriptionRequest()
346
        request.Parameters = params
347
        resp_fut = Future()
348
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
349
        self._uasocket.send_request(request, mycallbak)
350
        return resp_fut.result(self._timeout)
351
352
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
353
        self.logger.info("_create_subscription_callback")
354
        data = data_fut.result()
355
        response = ua.CreateSubscriptionResponse.from_binary(data)
356
        response.ResponseHeader.ServiceResult.check()
357
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
358
        resp_fut.set_result(response.Parameters)
359
360
    def delete_subscriptions(self, subscriptionids):
361
        self.logger.info("delete_subscription")
362
        request = ua.DeleteSubscriptionsRequest()
363
        request.Parameters.SubscriptionIds = subscriptionids
364
        resp_fut = Future()
365
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
366
        self._uasocket.send_request(request, mycallbak)
367
        return resp_fut.result(self._timeout)
368
369
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
370
        self.logger.info("_delete_subscriptions_callback")
371
        data = data_fut.result()
372
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
373
        response.ResponseHeader.ServiceResult.check()
374
        for sid in subscriptionids:
375
            self._publishcallbacks.pop(sid)
376
        resp_fut.set_result(response.Results)
377
378
    def publish(self, acks=None):
379
        self.logger.info("publish")
380
        if acks is None:
381
            acks = []
382
        request = ua.PublishRequest()
383
        request.Parameters.SubscriptionAcknowledgements = acks
384
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
385
386
    def _call_publish_callback(self, future):
387
        self.logger.info("call_publish_callback")
388
        data = future.result()
389
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
390
        try:
391
            response = ua.PublishResponse.from_binary(data)
392
        except Exception:
393
            self.logger.exception("Error parsing notificatipn from server")
394
            self.publish([]) #send publish request ot server so he does stop sending notifications
395
            return
396
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
397
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
398
            return
399
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
400
        try:
401
            callback(response.Parameters)
402
        except Exception:  # we call client code, catch everything!
403
            self.logger.exception("Exception while calling user callback: %s")
404
405
    def create_monitored_items(self, params):
406
        self.logger.info("create_monitored_items")
407
        request = ua.CreateMonitoredItemsRequest()
408
        request.Parameters = params
409
        data = self._uasocket.send_request(request)
410
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
411
        response.ResponseHeader.ServiceResult.check()
412
        return response.Results
413
414
    def delete_monitored_items(self, params):
415
        self.logger.info("delete_monitored_items")
416
        request = ua.DeleteMonitoredItemsRequest()
417
        request.Parameters = params
418
        data = self._uasocket.send_request(request)
419
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
420
        response.ResponseHeader.ServiceResult.check()
421
        return response.Results
422
423
    def add_nodes(self, nodestoadd):
424
        self.logger.info("add_nodes")
425
        request = ua.AddNodesRequest()
426
        request.Parameters.NodesToAdd = nodestoadd
427
        data = self._uasocket.send_request(request)
428
        response = ua.AddNodesResponse.from_binary(data)
429
        response.ResponseHeader.ServiceResult.check()
430
        return response.Results
431
432
    def delete_nodes(self, nodestodelete):
433
        self.logger.info("delete_nodes")
434
        request = ua.DeleteNodesRequest()
435
        request.Parameters.NodesToDelete = nodestodelete
436
        data = self._uasocket.send_request(request)
437
        response = ua.DeleteNodesResponse.from_binary(data)
438
        response.ResponseHeader.ServiceResult.check()
439
        return response.Results
440
441
    def call(self, methodstocall):
442
        request = ua.CallRequest()
443
        request.Parameters.MethodsToCall = methodstocall
444
        data = self._uasocket.send_request(request)
445
        response = ua.CallResponse.from_binary(data)
446
        response.ResponseHeader.ServiceResult.check()
447
        return response.Results
448
449
    def history_read(self, params):
450
        self.logger.info("history_read")
451
        request = ua.HistoryReadRequest()
452
        request.Parameters = params
453
        data = self._uasocket.send_request(request)
454
        response = ua.HistoryReadResponse.from_binary(data)
455
        response.ResponseHeader.ServiceResult.check()
456
        return response.Results
457