Test Failed
Pull Request — master (#268)
by
unknown
05:11
created

KeepAlive._connection_interrupted()   A

Complexity

Conditions 3

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 3

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 3
dl 0
loc 4
ccs 1
cts 1
cp 1
crap 3
rs 10
c 2
b 0
f 1
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition, Lock
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
from opcua.common.shortcuts import Shortcuts
17 1
use_crypto = True
18 1
try:
19
    from opcua.crypto import uacrypto
20
except ImportError:
21
    print("cryptography is not installed, use of crypto disabled")
22
    use_crypto = False
23
24 1
25
class KeepAlive(Thread):
26
27
    """
28
    Used by Client to keep session opened.
29
    OPCUA defines timeout both for sessions and secure channel
30
    """
31 1
32 1
    def __init__(self, client, timeout):
33 1
        Thread.__init__(self)
34 1
        self.logger = logging.getLogger(__name__)
35
        if timeout == 0:  # means no timeout bu we do not trust such servers
36 1
            timeout = 360000
37 1
        self.timeout = timeout
38 1
        self.client = client
39 1
        self._dostop = False
40
        self._cond = Condition()
41 1
        self._interruption_callbacks_lock = Lock()
42 1
        self._interruption_callbacks = set()
43 1
44 1
    def run(self):
45 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
46 1
        try:
47 1
            server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
48 1
            while not self._dostop:
49
                with self._cond:
50
                    self._cond.wait(self.timeout / 1000)
51
                if self._dostop:
52
                    break
53 1
                self.logger.debug("renewing channel")
54
                self.client.open_secure_channel(renew=True)
55 1
                val = server_state.get_value()
56 1
                self.logger.debug("server state is: %s ", val)
57 1
        except Exception as e:
58 1
            self._connection_interrupted(e)
59 1
        finally:
60
            self.logger.debug("keepalive thread has stopped")
61
62 1
    def stop(self):
63
        self.logger.debug("stoping keepalive thread")
64
        self._dostop = True
65
        with self._cond:
66
            self._cond.notify_all()
67
68
    def register_interruption_callback(self, callback):
69
        with self._interruption_callbacks_lock:
70
            self._interruption_callbacks.add(callback)
71
72
    def _connection_interrupted(self, exception):
73
        with self._interruption_callbacks_lock:
74 1
            for callback in self._interruption_callbacks:
75
                callback(exception)
76
77
class Client(object):
78
79
    """
80
    High level client to connect to an OPC-UA server.
81
82
    This class makes it easy to connect and browse address space.
83 1
    It attemps to expose as much functionality as possible
84 1
    but if you want more flexibility it is possible and adviced to
85 1
    use UaClient object, available as self.uaclient
86 1
    which offers the raw OPC-UA services interface.
87 1
    """
88 1
89 1
    def __init__(self, url, timeout=4):
90 1
        """
91 1
        used url argument to connect to server.
92 1
        if you are unsure of url, write at least hostname and port
93 1
        and call get_endpoints
94 1
        timeout is the timeout to get an answer for requests to server
95 1
        public member of this call are available to be set by API users
96 1
97 1
        """
98 1
        self.logger = logging.getLogger(__name__)
99 1
        self.server_url = urlparse(url)
100
        self.name = "Pure Python Client"
101 1
        self.description = self.name
102
        self.application_uri = "urn:freeopcua:client"
103
        self.product_uri = "urn:freeopcua.github.no:client"
104
        self.security_policy = ua.SecurityPolicy()
105 1
        self.secure_channel_id = None
106
        self.default_timeout = 3600000
107
        self.secure_channel_timeout = self.default_timeout
108 1
        self.session_timeout = self.default_timeout
109
        self._policy_ids = []
110
        self.uaclient = UaClient(timeout)
111
        self.user_certificate = None
112
        self.user_private_key = None
113 1
        self._session_counter = 1
114 1
        self.keepalive = None
115
        self.nodes = Shortcuts(self.uaclient)
116
117 1
    def __enter__(self):
118 1
        self.connect()
119
        return self
120
121 1
    def __exit__(self, exc_type, exc_value, traceback):
122
        self.disconnect()
123
124
    @staticmethod
125
    def find_endpoint(endpoints, security_mode, policy_uri):
126
        """
127
        Find endpoint with required security mode and policy URI
128
        """
129
        for ep in endpoints:
130
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
131 1
                    ep.SecurityMode == security_mode and
132
                    ep.SecurityPolicyUri == policy_uri):
133 1
                return ep
134 1
        raise ua.UaError("No matching endpoints: {}, {}".format(
135
                         security_mode, policy_uri))
136 1
137 1
    def set_security_string(self, string):
138 1
        """
139
        Set SecureConnection mode. String format:
140
        Policy,Mode,certificate,private_key[,server_private_key]
141 1
        where Policy is Basic128Rsa15 or Basic256,
142
            Mode is Sign or SignAndEncrypt
143
            certificate, private_key and server_private_key are
144
                paths to .pem or .der files
145
        Call this before connect()
146
        """
147
        if not string:
148 1
            return
149
        parts = string.split(',')
150 1
        if len(parts) < 4:
151 1
            raise ua.UaError('Wrong format: `{}`, expected at least 4 comma-separated values'.format(string))
152 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
153
        mode = getattr(ua.MessageSecurityMode, parts[1])
154
        return self.set_security(policy_class, parts[2], parts[3],
155 1
                                 parts[4] if len(parts) >= 5 else None, mode)
156 1
157 1
    def set_security(self, policy, certificate_path, private_key_path,
158 1
                     server_certificate_path=None,
159
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
160 1
        """
161
        Set SecureConnection mode.
162
        Call this before connect()
163
        """
164
        if server_certificate_path is None:
165
            # load certificate from server's list of endpoints
166 1
            endpoints = self.connect_and_get_server_endpoints()
167
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
168
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
169
        else:
170
            server_cert = uacrypto.load_certificate(server_certificate_path)
171
        cert = uacrypto.load_certificate(certificate_path)
172 1
        pk = uacrypto.load_private_key(private_key_path)
173
        self.security_policy = policy(server_cert, cert, pk, mode)
174
        self.uaclient.set_security(self.security_policy)
175
176 1
    def load_client_certificate(self, path):
177 1
        """
178 1
        load our certificate from file, either pem or der
179 1
        """
180 1
        self.user_certificate = uacrypto.load_certificate(path)
181 1
182 1
    def load_private_key(self, path):
183
        """
184 1
        Load user private key. This is used for authenticating using certificate
185
        """
186
        self.user_private_key = uacrypto.load_private_key(path)
187
188
    def connect_and_get_server_endpoints(self):
189
        """
190
        Connect, ask server for endpoints, and disconnect
191
        """
192
        self.connect_socket()
193
        self.send_hello()
194
        self.open_secure_channel()
195
        endpoints = self.get_endpoints()
196 1
        self.close_secure_channel()
197
        self.disconnect_socket()
198
        return endpoints
199
200
    def connect_and_find_servers(self):
201
        """
202
        Connect, ask server for a list of known servers, and disconnect
203
        """
204
        self.connect_socket()
205
        self.send_hello()
206
        self.open_secure_channel()  # spec says it should not be necessary to open channel
207
        servers = self.find_servers()
208 1
        self.close_secure_channel()
209
        self.disconnect_socket()
210
        return servers
211
212
    def connect_and_find_servers_on_network(self):
213 1
        """
214 1
        Connect, ask server for a list of known servers on network, and disconnect
215 1
        """
216 1
        self.connect_socket()
217 1
        self.send_hello()
218
        self.open_secure_channel()
219 1
        servers = self.find_servers_on_network()
220
        self.close_secure_channel()
221
        self.disconnect_socket()
222
        return servers
223
224 1
    def connect(self):
225 1
        """
226 1
        High level method
227
        Connect, create and activate session
228 1
        """
229
        self.connect_socket()
230
        self.send_hello()
231
        self.open_secure_channel()
232 1
        self.create_session()
233
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
234 1
235 1
    def disconnect(self):
236
        """
237 1
        High level method
238
        Close session, secure channel and socket
239
        """
240
        self.close_session()
241 1
        self.close_secure_channel()
242
        self.disconnect_socket()
243
244 1
    @property
245
    def connected(self):
246
        return self.keepalive.is_alive()
247
248 1
    def connect_socket(self):
249 1
        """
250 1
        connect to socket defined in url
251 1
        """
252
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
253 1
254 1
    def disconnect_socket(self):
255 1
        self.uaclient.disconnect_socket()
256 1
257 1
    def send_hello(self):
258 1
        """
259 1
        Send OPC-UA hello to server
260
        """
261 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
262 1
        # FIXME check ack
263
264 1
    def open_secure_channel(self, renew=False):
265 1
        """
266 1
        Open secure channel, if renew is True, renew channel
267 1
        """
268
        params = ua.OpenSecureChannelParameters()
269 1
        params.ClientProtocolVersion = 0
270
        params.RequestType = ua.SecurityTokenRequestType.Issue
271
        if renew:
272
            params.RequestType = ua.SecurityTokenRequestType.Renew
273
        params.SecurityMode = self.security_policy.Mode
274 1
        params.RequestedLifetime = self.secure_channel_timeout
275 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
276 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
277 1
        result = self.uaclient.open_secure_channel(params)
278 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
279 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
280 1
281 1
    def close_secure_channel(self):
282
        return self.uaclient.close_secure_channel()
283
284
    def get_endpoints(self):
285
        params = ua.GetEndpointsParameters()
286
        params.EndpointUrl = self.server_url.geturl()
287 1
        return self.uaclient.get_endpoints(params)
288
289 1
    def register_server(self, server, discovery_configuration=None):
290
        """
291
        register a server to discovery server
292
        if discovery_configuration is provided, the newer register_server2 service call is used
293
        """
294
        serv = ua.RegisteredServer()
295 1
        serv.ServerUri = server.application_uri
296 1
        serv.ProductUri = server.product_uri
297 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
298 1
        serv.ServerType = server.application_type
299 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
300 1
        serv.IsOnline = True
301
        if discovery_configuration:
302 1
            params = ua.RegisterServer2Parameters()
303
            params.Server = serv
304
            params.DiscoveryConfiguration = discovery_configuration
305
            return self.uaclient.register_server2(params)
306 1
        else:
307
            return self.uaclient.register_server(serv)
308
309
    def find_servers(self, uris=None):
310
        """
311
        send a FindServer request to the server. The answer should be a list of
312 1
        servers the server knows about
313 1
        A list of uris can be provided, only server having matching uris will be returned
314 1
        """
315 1
        if uris is None:
316 1
            uris = []
317
        params = ua.FindServersParameters()
318 1
        params.EndpointUrl = self.server_url.geturl()
319 1
        params.ServerUris = uris
320 1
        return self.uaclient.find_servers(params)
321 1
322 1
    def find_servers_on_network(self):
323 1
        params = ua.FindServersOnNetworkParameters()
324 1
        return self.uaclient.find_servers_on_network(params)
325 1
326 1
    def create_session(self):
327 1
        """
328 1
        send a CreateSessionRequest to server with reasonable parameters.
329 1
        If you want o modify settings look at code of this methods
330
        and make your own
331 1
        """
332 1
        desc = ua.ApplicationDescription()
333 1
        desc.ApplicationUri = self.application_uri
334 1
        desc.ProductUri = self.product_uri
335 1
        desc.ApplicationName = ua.LocalizedText(self.name)
336 1
        desc.ApplicationType = ua.ApplicationType.Client
337
338
        params = ua.CreateSessionParameters()
339 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
340 1
        params.ClientNonce = nonce
341 1
        params.ClientCertificate = self.security_policy.client_certificate
342 1
        params.ClientDescription = desc
343 1
        params.EndpointUrl = self.server_url.geturl()
344 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
345
        params.RequestedSessionTimeout = 3600000
346 1
        params.MaxResponseMessageSize = 0  # means no max size
347
        response = self.uaclient.create_session(params)
348
        if self.security_policy.client_certificate is None:
349
            data = nonce
350
        else:
351 1
            data = self.security_policy.client_certificate + nonce
352 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
353 1
        self._server_nonce = response.ServerNonce
354
        if not self.security_policy.server_certificate:
355
            self.security_policy.server_certificate = response.ServerCertificate
356 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
357
            raise ua.UaError("Server certificate mismatch")
358
        # remember PolicyId's: we will use them in activate_session()
359
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
360
        self._policy_ids = ep.UserIdentityTokens
361
        self.session_timeout = response.RevisedSessionTimeout
362 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
363 1
        self.keepalive.start()
364 1
        return response
365
366
    def server_policy_id(self, token_type, default):
367 1
        """
368
        Find PolicyId of server's UserTokenPolicy by token_type.
369
        Return default if there's no matching UserTokenPolicy.
370 1
        """
371
        for policy in self._policy_ids:
372
            if policy.TokenType == token_type:
373
                return policy.PolicyId
374 1
        return default
375 1
376 1
    def server_policy_uri(self, token_type):
377 1
        """
378 1
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
379 1
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
380 1
        of the endpoint
381 1
        """
382 1
        for policy in self._policy_ids:
383 1
            if policy.TokenType == token_type:
384 1
                if policy.SecurityPolicyUri:
385 1
                    return policy.SecurityPolicyUri
386
                else:   # empty URI means "use this endpoint's policy URI"
387
                    return self.security_policy.URI
388 1
        return self.security_policy.URI
389 1
390
    def activate_session(self, username=None, password=None, certificate=None):
391 1
        """
392 1
        Activate session using either username and password or private_key
393 1
        """
394
        params = ua.ActivateSessionParameters()
395 1
        challenge = b""
396
        if self.security_policy.server_certificate is not None:
397
            challenge += self.security_policy.server_certificate
398
        if self._server_nonce is not None:
399
            challenge += self._server_nonce
400
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
401
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
402
        params.LocaleIds.append("en")
403
        if not username and not certificate:
404
            self._add_anonymous_auth(params)
405
        elif certificate:
406 1
            self._add_certificate_auth(params, certificate, challenge)
407 1
        else:
408 1
            self._add_user_auth(params, username, password)
409 1
        return self.uaclient.activate_session(params)
410 1
411
    def _add_anonymous_auth(self, params):
412
        params.UserIdentityToken = ua.AnonymousIdentityToken()
413
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
414 1
415
    def _add_certificate_auth(self, params, certificate, challenge):
416
        params.UserIdentityToken = ua.X509IdentityToken()
417 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
418
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
419
        # specs part 4, 5.6.3.1: the data to sign is created by appending
420
        # the last serverNonce to the serverCertificate
421
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
422 1
        params.UserTokenSignature = ua.SignatureData()
423
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
424 1
        params.UserTokenSignature.Signature = sig
425
426
    def _add_user_auth(self, params, username, password):
427
        params.UserIdentityToken = ua.UserNameIdentityToken()
428
        params.UserIdentityToken.UserName = username
429
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
430
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
431
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
432
            # then the password only contains UTF-8 encoded password
433
            # and EncryptionAlgorithm is null
434 1
            if self.server_url.password:
435
                self.logger.warning("Sending plain-text password")
436
                params.UserIdentityToken.Password = password
437
            params.UserIdentityToken.EncryptionAlgorithm = ''
438 1
        elif self.server_url.password:
439 1
            data, uri = self._encrypt_password(password, policy_uri)
440 1
            params.UserIdentityToken.Password = data 
441
            params.UserIdentityToken.EncryptionAlgorithm = uri
442 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
443 1
444
    def _encrypt_password(self, password, policy_uri):
445 1
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
446 1
        # see specs part 4, 7.36.3: if the token is encrypted, password
447
        # shall be converted to UTF-8 and serialized with server nonce
448 1
        passwd = password.encode("utf8")
449 1
        if self._server_nonce is not None:
450
            passwd += self._server_nonce
451 1
        etoken = ua.pack_bytes(passwd)
452
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
453
        return data, uri
454
455 1
    def close_session(self):
456
        """
457 1
        Close session
458
        """
459
        if self.keepalive:
460
            self.keepalive.stop()
461
        return self.uaclient.close_session(True)
462
463
    def get_root_node(self):
464
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
465
466
    def get_objects_node(self):
467
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
468
469 1
    def get_server_node(self):
470 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
471 1
472 1
    def get_node(self, nodeid):
473 1
        """
474 1
        Get node using NodeId object or a string representing a NodeId
475 1
        """
476 1
        return Node(self.uaclient, nodeid)
477
478 1
    def create_subscription(self, period, handler):
479 1
        """
480 1
        Create a subscription.
481
        returns a Subscription object which allow
482 1
        to subscribe to events or data on server
483 1
        handler argument is a class with data_change and/or event methods.
484 1
        These methods will be called when notfication from server are received.
485
        See example-client.py.
486 1
        Do not do expensive/slow or network operation from these methods
487 1
        since they are called directly from receiving thread. This is a design choice,
488
        start another thread if you need to do such a thing.
489
        """
490
        params = ua.CreateSubscriptionParameters()
491
        params.RequestedPublishingInterval = period
492
        params.RequestedLifetimeCount = 3000
493
        params.RequestedMaxKeepAliveCount = 10000
494
        params.MaxNotificationsPerPublish = 10000
495
        params.PublishingEnabled = True
496
        params.Priority = 0
497
        return Subscription(self.uaclient, params, handler)
498
499
    def get_namespace_array(self):
500
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
501
        return ns_node.get_value()
502
503
    def get_namespace_index(self, uri):
504
        uries = self.get_namespace_array()
505
        return uries.index(uri)
506
507
    def delete_nodes(self, nodes, recursive=False):
508
        return delete_nodes(self.uaclient, nodes, recursive)
509
            
510