Passed
Push — master ( 96bac7...bd9702 )
by Olivier
12:44 queued 05:54
created

Client.connect_and_find_servers()   A

Complexity

Conditions 1

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.7023

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 13
ccs 1
cts 9
cp 0.1111
crap 1.7023
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6 1
except ImportError:  # support for python2
7 1
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
from opcua.common.structures import load_type_definitions
20 1
use_crypto = True
21 1
try:
22 1
    from opcua.crypto import uacrypto
23
except ImportError:
24
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
25
    use_crypto = False
26
27
28 1
class KeepAlive(Thread):
29
30
    """
31
    Used by Client to keep the session open.
32
    OPCUA defines timeout both for sessions and secure channel
33
    """
34
35 1
    def __init__(self, client, timeout):
36
        """
37
        :param session_timeout: Timeout to re-new the session
38
            in milliseconds.
39
        """
40 1
        Thread.__init__(self)
41 1
        self.logger = logging.getLogger(__name__)
42
43 1
        self.client = client
44 1
        self._dostop = False
45 1
        self._cond = Condition()
46 1
        self.timeout = timeout
47
48
        # some server support no timeout, but we do not trust them
49 1
        if self.timeout == 0:
50
            self.timeout = 3600000 # 1 hour
51
52 1
    def run(self):
53 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
54 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
55 1
        while not self._dostop:
56 1
            with self._cond:
57 1
                self._cond.wait(self.timeout / 1000)
58 1
            if self._dostop:
59 1
                break
60
            self.logger.debug("renewing channel")
61
            self.client.open_secure_channel(renew=True)
62
            val = server_state.get_value()
63
            self.logger.debug("server state is: %s ", val)
64 1
        self.logger.debug("keepalive thread has stopped")
65
66 1
    def stop(self):
67 1
        self.logger.debug("stoping keepalive thread")
68 1
        self._dostop = True
69 1
        with self._cond:
70 1
            self._cond.notify_all()
71
72
73 1
class Client(object):
74
75
    """
76
    High level client to connect to an OPC-UA server.
77
78
    This class makes it easy to connect and browse address space.
79
    It attemps to expose as much functionality as possible
80
    but if you want more flexibility it is possible and adviced to
81
    use UaClient object, available as self.uaclient
82
    which offers the raw OPC-UA services interface.
83
    """
84
85 1
    def __init__(self, url, timeout=4):
86
        """
87
88
        :param url: url of the server.
89
            if you are unsure of url, write at least hostname
90
            and port and call get_endpoints
91
92
        :param timeout:
93
            Each request sent to the server expects an answer within this
94
            time. The timeout is specified in seconds.
95
        """
96 1
        self.logger = logging.getLogger(__name__)
97 1
        self.server_url = urlparse(url)
98
        #take initial username and password from the url
99 1
        self._username = self.server_url.username
100 1
        self._password = self.server_url.password
101 1
        self.name = "Pure Python Client"
102 1
        self.description = self.name
103 1
        self.application_uri = "urn:freeopcua:client"
104 1
        self.product_uri = "urn:freeopcua.github.no:client"
105 1
        self.security_policy = ua.SecurityPolicy()
106 1
        self.secure_channel_id = None
107 1
        self.secure_channel_timeout = 3600000 # 1 hour
108 1
        self.session_timeout = 3600000 # 1 hour
109 1
        self._policy_ids = []
110 1
        self.uaclient = UaClient(timeout)
111 1
        self.user_certificate = None
112 1
        self.user_private_key = None
113 1
        self._server_nonce = None
114 1
        self._session_counter = 1
115 1
        self.keepalive = None
116 1
        self.nodes = Shortcuts(self.uaclient)
117 1
        self.max_messagesize = 0 # No limits
118 1
        self.max_chunkcount = 0 # No limits
119
120
121 1
    def __enter__(self):
122 1
        self.connect()
123 1
        return self
124
125 1
    def __exit__(self, exc_type, exc_value, traceback):
126 1
        self.disconnect()
127
128 1
    @staticmethod
129
    def find_endpoint(endpoints, security_mode, policy_uri):
130
        """
131
        Find endpoint with required security mode and policy URI
132
        """
133 1
        for ep in endpoints:
134 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
135
                    ep.SecurityMode == security_mode and
136
                    ep.SecurityPolicyUri == policy_uri):
137 1
                return ep
138 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
139
140 1
    def set_user(self, username):
141
        """
142
        Set user name for the connection.
143
        initial user from the URL will be overwritten
144
        """
145
        self._username = username
146
147 1
    def set_password(self, pwd):
148
        """
149
        Set user password for the connection.
150
        initial password from the URL will be overwritten
151
        """
152
        self._password = pwd   
153
154 1
    def set_security_string(self, string):
155
        """
156
        Set SecureConnection mode. String format:
157
        Policy,Mode,certificate,private_key[,server_private_key]
158
        where Policy is Basic128Rsa15 or Basic256,
159
            Mode is Sign or SignAndEncrypt
160
            certificate, private_key and server_private_key are
161
                paths to .pem or .der files
162
        Call this before connect()
163
        """
164 1
        if not string:
165
            return
166 1
        parts = string.split(',')
167 1
        if len(parts) < 4:
168
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
169 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
170 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
171 1
        return self.set_security(policy_class, parts[2], parts[3],
172
                                 parts[4] if len(parts) >= 5 else None, mode)
173
174 1
    def set_security(self, policy, certificate_path, private_key_path,
175
                     server_certificate_path=None,
176
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
177
        """
178
        Set SecureConnection mode.
179
        Call this before connect()
180
        """
181 1
        if server_certificate_path is None:
182
            # load certificate from server's list of endpoints
183 1
            endpoints = self.connect_and_get_server_endpoints()
184 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
185 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
186
        else:
187
            server_cert = uacrypto.load_certificate(server_certificate_path)
188 1
        cert = uacrypto.load_certificate(certificate_path)
189 1
        pk = uacrypto.load_private_key(private_key_path)
190 1
        self.security_policy = policy(server_cert, cert, pk, mode)
191 1
        self.uaclient.set_security(self.security_policy)
192
193 1
    def load_client_certificate(self, path):
194
        """
195
        load our certificate from file, either pem or der
196
        """
197
        self.user_certificate = uacrypto.load_certificate(path)
198
199 1
    def load_private_key(self, path):
200
        """
201
        Load user private key. This is used for authenticating using certificate
202
        """
203
        self.user_private_key = uacrypto.load_private_key(path)
204
205 1
    def connect_and_get_server_endpoints(self):
206
        """
207
        Connect, ask server for endpoints, and disconnect
208
        """
209 1
        self.connect_socket()
210 1
        try:
211 1
            self.send_hello()
212 1
            self.open_secure_channel()
213 1
            endpoints = self.get_endpoints()
214 1
            self.close_secure_channel()
215
        finally:
216 1
            self.disconnect_socket()
217 1
        return endpoints
218
219 1
    def connect_and_find_servers(self):
220
        """
221
        Connect, ask server for a list of known servers, and disconnect
222
        """
223
        self.connect_socket()
224
        try:
225
            self.send_hello()
226
            self.open_secure_channel()  # spec says it should not be necessary to open channel
227
            servers = self.find_servers()
228
            self.close_secure_channel()
229
        finally:
230
            self.disconnect_socket()
231
        return servers
232
233 1
    def connect_and_find_servers_on_network(self):
234
        """
235
        Connect, ask server for a list of known servers on network, and disconnect
236
        """
237
        self.connect_socket()
238
        try:
239
            self.send_hello()
240
            self.open_secure_channel()
241
            servers = self.find_servers_on_network()
242
            self.close_secure_channel()
243
        finally:
244
            self.disconnect_socket()
245
        return servers
246
247 1
    def connect(self):
248
        """
249
        High level method
250
        Connect, create and activate session
251
        """
252 1
        self.connect_socket()
253 1
        try:
254 1
            self.send_hello()
255 1
            self.open_secure_channel()
256 1
            self.create_session()
257
        except:
258
            self.disconnect_socket() # clean up open socket
259
            raise
260 1
        self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
261
262 1
    def disconnect(self):
263
        """
264
        High level method
265
        Close session, secure channel and socket
266
        """
267 1
        try:
268 1
            self.close_session()
269 1
            self.close_secure_channel()
270
        finally:
271 1
            self.disconnect_socket()
272
273 1
    def connect_socket(self):
274
        """
275
        connect to socket defined in url
276
        """
277 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
278
279 1
    def disconnect_socket(self):
280 1
        self.uaclient.disconnect_socket()
281
282 1
    def send_hello(self):
283
        """
284
        Send OPC-UA hello to server
285
        """
286 1
        ack = self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
287
        # FIXME check ack
288
289 1
    def open_secure_channel(self, renew=False):
290
        """
291
        Open secure channel, if renew is True, renew channel
292
        """
293 1
        params = ua.OpenSecureChannelParameters()
294 1
        params.ClientProtocolVersion = 0
295 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
296 1
        if renew:
297
            params.RequestType = ua.SecurityTokenRequestType.Renew
298 1
        params.SecurityMode = self.security_policy.Mode
299 1
        params.RequestedLifetime = self.secure_channel_timeout
300 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
301 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
302 1
        result = self.uaclient.open_secure_channel(params)
303 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
304 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
305
306 1
    def close_secure_channel(self):
307 1
        return self.uaclient.close_secure_channel()
308
309 1
    def get_endpoints(self):
310 1
        params = ua.GetEndpointsParameters()
311 1
        params.EndpointUrl = self.server_url.geturl()
312 1
        return self.uaclient.get_endpoints(params)
313
314 1
    def register_server(self, server, discovery_configuration=None):
315
        """
316
        register a server to discovery server
317
        if discovery_configuration is provided, the newer register_server2 service call is used
318
        """
319 1
        serv = ua.RegisteredServer()
320 1
        serv.ServerUri = server.application_uri
321 1
        serv.ProductUri = server.product_uri
322 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
323 1
        serv.ServerType = server.application_type
324 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
325 1
        serv.IsOnline = True
326 1
        if discovery_configuration:
327
            params = ua.RegisterServer2Parameters()
328
            params.Server = serv
329
            params.DiscoveryConfiguration = discovery_configuration
330
            return self.uaclient.register_server2(params)
331
        else:
332 1
            return self.uaclient.register_server(serv)
333
334 1
    def find_servers(self, uris=None):
335
        """
336
        send a FindServer request to the server. The answer should be a list of
337
        servers the server knows about
338
        A list of uris can be provided, only server having matching uris will be returned
339
        """
340 1
        if uris is None:
341 1
            uris = []
342 1
        params = ua.FindServersParameters()
343 1
        params.EndpointUrl = self.server_url.geturl()
344 1
        params.ServerUris = uris
345 1
        return self.uaclient.find_servers(params)
346
347 1
    def find_servers_on_network(self):
348
        params = ua.FindServersOnNetworkParameters()
349
        return self.uaclient.find_servers_on_network(params)
350
351 1
    def create_session(self):
352
        """
353
        send a CreateSessionRequest to server with reasonable parameters.
354
        If you want o modify settings look at code of this methods
355
        and make your own
356
        """
357 1
        desc = ua.ApplicationDescription()
358 1
        desc.ApplicationUri = self.application_uri
359 1
        desc.ProductUri = self.product_uri
360 1
        desc.ApplicationName = ua.LocalizedText(self.name)
361 1
        desc.ApplicationType = ua.ApplicationType.Client
362
363 1
        params = ua.CreateSessionParameters()
364 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
365 1
        params.ClientNonce = nonce
366 1
        params.ClientCertificate = self.security_policy.client_certificate
367 1
        params.ClientDescription = desc
368 1
        params.EndpointUrl = self.server_url.geturl()
369 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
370 1
        params.RequestedSessionTimeout = 3600000
371 1
        params.MaxResponseMessageSize = 0  # means no max size
372 1
        response = self.uaclient.create_session(params)
373 1
        if self.security_policy.client_certificate is None:
374 1
            data = nonce
375
        else:
376 1
            data = self.security_policy.client_certificate + nonce
377 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
378 1
        self._server_nonce = response.ServerNonce
379 1
        if not self.security_policy.server_certificate:
380 1
            self.security_policy.server_certificate = response.ServerCertificate
381 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
382
            raise ua.UaError("Server certificate mismatch")
383
        # remember PolicyId's: we will use them in activate_session()
384 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
385 1
        self._policy_ids = ep.UserIdentityTokens
386 1
        self.session_timeout = response.RevisedSessionTimeout
387 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
388 1
        self.keepalive.start()
389 1
        return response
390
391 1
    def server_policy_id(self, token_type, default):
392
        """
393
        Find PolicyId of server's UserTokenPolicy by token_type.
394
        Return default if there's no matching UserTokenPolicy.
395
        """
396 1
        for policy in self._policy_ids:
397 1
            if policy.TokenType == token_type:
398 1
                return policy.PolicyId
399
        return default
400
401 1
    def server_policy_uri(self, token_type):
402
        """
403
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
404
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
405
        of the endpoint
406
        """
407 1
        for policy in self._policy_ids:
408 1
            if policy.TokenType == token_type:
409 1
                if policy.SecurityPolicyUri:
410
                    return policy.SecurityPolicyUri
411
                else:   # empty URI means "use this endpoint's policy URI"
412 1
                    return self.security_policy.URI
413
        return self.security_policy.URI
414
415 1
    def activate_session(self, username=None, password=None, certificate=None):
416
        """
417
        Activate session using either username and password or private_key
418
        """
419 1
        params = ua.ActivateSessionParameters()
420 1
        challenge = b""
421 1
        if self.security_policy.server_certificate is not None:
422 1
            challenge += self.security_policy.server_certificate
423 1
        if self._server_nonce is not None:
424 1
            challenge += self._server_nonce
425 1
        params.ClientSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
426 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
427 1
        params.LocaleIds.append("en")
428 1
        if not username and not certificate:
429 1
            self._add_anonymous_auth(params)
430 1
        elif certificate:
431
            self._add_certificate_auth(params, certificate, challenge)
432
        else:
433 1
            self._add_user_auth(params, username, password)
434 1
        return self.uaclient.activate_session(params)
435
436 1
    def _add_anonymous_auth(self, params):
437 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
438 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
439
440 1
    def _add_certificate_auth(self, params, certificate, challenge):
441
        params.UserIdentityToken = ua.X509IdentityToken()
442
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
443
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
444
        # specs part 4, 5.6.3.1: the data to sign is created by appending
445
        # the last serverNonce to the serverCertificate
446
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
447
        params.UserTokenSignature = ua.SignatureData()
448
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
449
        params.UserTokenSignature.Signature = sig
450
451 1
    def _add_user_auth(self, params, username, password):
452 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
453 1
        params.UserIdentityToken.UserName = username
454 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
455 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
456
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
457
            # then the password only contains UTF-8 encoded password
458
            # and EncryptionAlgorithm is null
459 1
            if self._password:
460
                self.logger.warning("Sending plain-text password")
461
                params.UserIdentityToken.Password = password
462 1
            params.UserIdentityToken.EncryptionAlgorithm = None
463
        elif self._password:
464
            data, uri = self._encrypt_password(password, policy_uri)
465
            params.UserIdentityToken.Password = data
466
            params.UserIdentityToken.EncryptionAlgorithm = uri
467 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
468
469 1
    def _encrypt_password(self, password, policy_uri):
470
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
471
        # see specs part 4, 7.36.3: if the token is encrypted, password
472
        # shall be converted to UTF-8 and serialized with server nonce
473
        passwd = password.encode("utf8")
474
        if self._server_nonce is not None:
475
            passwd += self._server_nonce
476
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
477
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
478
        return data, uri
479
480 1
    def close_session(self):
481
        """
482
        Close session
483
        """
484 1
        if self.keepalive:
485 1
            self.keepalive.stop()
486 1
        return self.uaclient.close_session(True)
487
488 1
    def get_root_node(self):
489 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
490
491 1
    def get_objects_node(self):
492 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
493
494 1
    def get_server_node(self):
495 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
496
497 1
    def get_node(self, nodeid):
498
        """
499
        Get node using NodeId object or a string representing a NodeId
500
        """
501 1
        return Node(self.uaclient, nodeid)
502
503 1
    def create_subscription(self, period, handler):
504
        """
505
        Create a subscription.
506
        returns a Subscription object which allow
507
        to subscribe to events or data on server
508
        handler argument is a class with data_change and/or event methods.
509
        period argument is either a publishing interval in milliseconds or a
510
        CreateSubscriptionParameters instance. The second option should be used,
511
        if the opcua-server has problems with the default options.
512
        These methods will be called when notfication from server are received.
513
        See example-client.py.
514
        Do not do expensive/slow or network operation from these methods
515
        since they are called directly from receiving thread. This is a design choice,
516
        start another thread if you need to do such a thing.
517
        """
518
519 1
        if isinstance(period, ua.CreateSubscriptionParameters):
520
            return Subscription(self.uaclient, period, handler)
521 1
        params = ua.CreateSubscriptionParameters()
522 1
        params.RequestedPublishingInterval = period
523 1
        params.RequestedLifetimeCount = 10000
524 1
        params.RequestedMaxKeepAliveCount = 3000
525 1
        params.MaxNotificationsPerPublish = 10000
526 1
        params.PublishingEnabled = True
527 1
        params.Priority = 0
528 1
        return Subscription(self.uaclient, params, handler)
529
530 1
    def get_namespace_array(self):
531 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
532 1
        return ns_node.get_value()
533
534 1
    def get_namespace_index(self, uri):
535 1
        uries = self.get_namespace_array()
536 1
        return uries.index(uri)
537
538 1
    def delete_nodes(self, nodes, recursive=False):
539 1
        return delete_nodes(self.uaclient, nodes, recursive)
540
541 1
    def import_xml(self, path):
542
        """
543
        Import nodes defined in xml
544
        """
545 1
        importer = XmlImporter(self)
546 1
        return importer.import_xml(path)
547
548 1
    def export_xml(self, nodes, path):
549
        """
550
        Export defined nodes to xml
551
        """
552 1
        exp = XmlExporter(self)
553 1
        exp.build_etree(nodes)
554 1
        return exp.write_xml(path)
555
556 1
    def register_namespace(self, uri):
557
        """
558
        Register a new namespace. Nodes should in custom namespace, not 0.
559
        This method is mainly implemented for symetry with server
560
        """
561 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
562 1
        uries = ns_node.get_value()
563 1
        if uri in uries:
564 1
            return uries.index(uri)
565 1
        uries.append(uri)
566 1
        ns_node.set_value(uries)
567 1
        return len(uries) - 1
568
569 1
    def load_type_definitions(self, nodes=None):
570
        return load_type_definitions(self, nodes)
571
572
573