Passed
Pull Request — master (#545)
by
unknown
03:52
created

Client.connect()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2.108

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 2
c 2
b 0
f 1
dl 0
loc 14
ccs 7
cts 10
cp 0.7
crap 2.108
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6 1
except ImportError:  # support for python2
7 1
    from urlparse import urlparse
8
9 1
from concurrent.futures import _base
10 1
from opcua import ua
11 1
from opcua.client.ua_client import UaClient
12 1
from opcua.common.xmlimporter import XmlImporter
13 1
from opcua.common.xmlexporter import XmlExporter
14 1
from opcua.common.node import Node
15 1
from opcua.common.manage_nodes import delete_nodes
16 1
from opcua.common.subscription import Subscription
17 1
from opcua.common import utils
18 1
from opcua.crypto import security_policies
19 1
from opcua.common.shortcuts import Shortcuts
20 1
from opcua.common.structures import load_type_definitions
21 1
use_crypto = True
22 1
try:
23 1
    from opcua.crypto import uacrypto
24
except ImportError:
25
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
26
    use_crypto = False
27
28
29 1
class KeepAlive(Thread):
30
31
    """
32
    Used by Client to keep the session open.
33
    OPCUA defines timeout both for sessions and secure channel
34
    """
35
36 1
    def __init__(self, client, timeout):
37
        """
38
        :param session_timeout: Timeout to re-new the session
39
            in milliseconds.
40
        """
41 1
        Thread.__init__(self)
42 1
        self.logger = logging.getLogger(__name__)
43
44 1
        self.client = client
45 1
        self._dostop = False
46 1
        self._cond = Condition()
47 1
        self.timeout = timeout
48
49
        # some server support no timeout, but we do not trust them
50 1
        if self.timeout == 0:
51
            self.timeout = 3600000 # 1 hour
52
53 1
    def run(self):
54 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
55 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
56 1
        while not self._dostop:
57 1
            with self._cond:
58 1
                self._cond.wait(self.timeout / 1000)
59 1
            if self._dostop:
60 1
                break
61
            self.logger.debug("renewing channel")
62
            self.client.open_secure_channel(renew=True)
63
            val = server_state.get_value()
64
            self.logger.debug("server state is: %s ", val)
65 1
        self.logger.debug("keepalive thread has stopped")
66
67 1
    def stop(self):
68 1
        self.logger.debug("stoping keepalive thread")
69 1
        self._dostop = True
70 1
        with self._cond:
71 1
            self._cond.notify_all()
72
73
74 1
class Client(object):
75
76
    """
77
    High level client to connect to an OPC-UA server.
78
79
    This class makes it easy to connect and browse address space.
80
    It attemps to expose as much functionality as possible
81
    but if you want more flexibility it is possible and adviced to
82
    use UaClient object, available as self.uaclient
83
    which offers the raw OPC-UA services interface.
84
    """
85
86 1
    def __init__(self, url, timeout=4):
87
        """
88
89
        :param url: url of the server.
90
            if you are unsure of url, write at least hostname
91
            and port and call get_endpoints
92
93
        :param timeout:
94
            Each request sent to the server expects an answer within this
95
            time. The timeout is specified in seconds.
96
        """
97 1
        self.logger = logging.getLogger(__name__)
98 1
        self.server_url = urlparse(url)
99
        #take initial username and password from the url
100 1
        self._username = self.server_url.username
101 1
        self._password = self.server_url.password
102 1
        self.name = "Pure Python Client"
103 1
        self.description = self.name
104 1
        self.application_uri = "urn:freeopcua:client"
105 1
        self.product_uri = "urn:freeopcua.github.no:client"
106 1
        self.security_policy = ua.SecurityPolicy()
107 1
        self.secure_channel_id = None
108 1
        self.secure_channel_timeout = 3600000 # 1 hour
109 1
        self.session_timeout = 3600000 # 1 hour
110 1
        self._policy_ids = []
111 1
        self.uaclient = UaClient(timeout)
112 1
        self.user_certificate = None
113 1
        self.user_private_key = None
114 1
        self._server_nonce = None
115 1
        self._session_counter = 1
116 1
        self.keepalive = None
117 1
        self.nodes = Shortcuts(self.uaclient)
118 1
        self.max_messagesize = 0 # No limits
119 1
        self.max_chunkcount = 0 # No limits
120
121
122 1
    def __enter__(self):
123 1
        self.connect()
124 1
        return self
125
126 1
    def __exit__(self, exc_type, exc_value, traceback):
127 1
        self.disconnect()
128
129 1
    @staticmethod
130
    def find_endpoint(endpoints, security_mode, policy_uri):
131
        """
132
        Find endpoint with required security mode and policy URI
133
        """
134 1
        for ep in endpoints:
135 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
136
                    ep.SecurityMode == security_mode and
137
                    ep.SecurityPolicyUri == policy_uri):
138 1
                return ep
139 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
140
141 1
    def set_user(self, username):
142
        """
143
        Set user name for the connection.
144
        initial user from the URL will be overwritten
145
        """
146
        self._username = username
147
148 1
    def set_password(self, pwd):
149
        """
150
        Set user password for the connection.
151
        initial password from the URL will be overwritten
152
        """
153
        self._password = pwd   
154
155 1
    def set_security_string(self, string):
156
        """
157
        Set SecureConnection mode. String format:
158
        Policy,Mode,certificate,private_key[,server_private_key]
159
        where Policy is Basic128Rsa15 or Basic256,
160
            Mode is Sign or SignAndEncrypt
161
            certificate, private_key and server_private_key are
162
                paths to .pem or .der files
163
        Call this before connect()
164
        """
165 1
        if not string:
166
            return
167 1
        parts = string.split(',')
168 1
        if len(parts) < 4:
169
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
170 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
171 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
172 1
        return self.set_security(policy_class, parts[2], parts[3],
173
                                 parts[4] if len(parts) >= 5 else None, mode)
174
175 1
    def set_security(self, policy, certificate_path, private_key_path,
176
                     server_certificate_path=None,
177
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
178
        """
179
        Set SecureConnection mode.
180
        Call this before connect()
181
        """
182 1
        if server_certificate_path is None:
183
            # load certificate from server's list of endpoints
184 1
            endpoints = self.connect_and_get_server_endpoints()
185 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
186 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
187
        else:
188
            server_cert = uacrypto.load_certificate(server_certificate_path)
189 1
        cert = uacrypto.load_certificate(certificate_path)
190 1
        pk = uacrypto.load_private_key(private_key_path)
191 1
        self.security_policy = policy(server_cert, cert, pk, mode)
192 1
        self.uaclient.set_security(self.security_policy)
193
194 1
    def load_client_certificate(self, path):
195
        """
196
        load our certificate from file, either pem or der
197
        """
198
        self.user_certificate = uacrypto.load_certificate(path)
199
200 1
    def load_private_key(self, path):
201
        """
202
        Load user private key. This is used for authenticating using certificate
203
        """
204
        self.user_private_key = uacrypto.load_private_key(path)
205
206 1
    def connect_and_get_server_endpoints(self):
207
        """
208
        Connect, ask server for endpoints, and disconnect
209
        """
210 1
        try:
211 1
            self.connect_socket()
212 1
            self.send_hello()
213 1
            self.open_secure_channel()
214 1
            endpoints = self.get_endpoints()
215 1
            self.close_secure_channel()
216
        finally:
217 1
            self.disconnect_socket()
218 1
        return endpoints
219
220 1
    def connect_and_find_servers(self):
221
        """
222
        Connect, ask server for a list of known servers, and disconnect
223
        """
224
        try:
225
            self.connect_socket()
226
            self.send_hello()
227
            self.open_secure_channel()  # spec says it should not be necessary to open channel
228
            servers = self.find_servers()
229
            self.close_secure_channel()
230
        finally:
231
            self.disconnect_socket()
232
        return servers
233
234 1
    def connect_and_find_servers_on_network(self):
235
        """
236
        Connect, ask server for a list of known servers on network, and disconnect
237
        """
238
        try:
239
            self.connect_socket()
240
            self.send_hello()
241
            self.open_secure_channel()
242
            servers = self.find_servers_on_network()
243
            self.close_secure_channel()
244
        finally:
245
            self.disconnect_socket()
246
        return servers
247
248 1
    def connect(self):
249
        """
250
        High level method
251
        Connect, create and activate session
252
        """
253 1
        self.connect_socket()
254 1
        self.send_hello()
255 1
        self.open_secure_channel()
256 1
        try:
257 1
            self.create_session()
258
        except _base.TimeoutError as e:
259
            self.disconnect_socket() # clean up open socket
260
            raise e
261 1
        self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
262
263 1
    def disconnect(self):
264
        """
265
        High level method
266
        Close session, secure channel and socket
267
        """
268 1
        try:
269 1
            self.close_session()
270 1
            self.close_secure_channel()
271
        finally:
272 1
            self.disconnect_socket()
273
274 1
    def connect_socket(self):
275
        """
276
        connect to socket defined in url
277
        """
278 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
279
280 1
    def disconnect_socket(self):
281 1
        self.uaclient.disconnect_socket()
282
283 1
    def send_hello(self):
284
        """
285
        Send OPC-UA hello to server
286
        """
287 1
        ack = self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
288
        # FIXME check ack
289
290 1
    def open_secure_channel(self, renew=False):
291
        """
292
        Open secure channel, if renew is True, renew channel
293
        """
294 1
        params = ua.OpenSecureChannelParameters()
295 1
        params.ClientProtocolVersion = 0
296 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
297 1
        if renew:
298
            params.RequestType = ua.SecurityTokenRequestType.Renew
299 1
        params.SecurityMode = self.security_policy.Mode
300 1
        params.RequestedLifetime = self.secure_channel_timeout
301 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
302 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
303 1
        result = self.uaclient.open_secure_channel(params)
304 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
305 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
306
307 1
    def close_secure_channel(self):
308 1
        return self.uaclient.close_secure_channel()
309
310 1
    def get_endpoints(self):
311 1
        params = ua.GetEndpointsParameters()
312 1
        params.EndpointUrl = self.server_url.geturl()
313 1
        return self.uaclient.get_endpoints(params)
314
315 1
    def register_server(self, server, discovery_configuration=None):
316
        """
317
        register a server to discovery server
318
        if discovery_configuration is provided, the newer register_server2 service call is used
319
        """
320 1
        serv = ua.RegisteredServer()
321 1
        serv.ServerUri = server.application_uri
322 1
        serv.ProductUri = server.product_uri
323 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
324 1
        serv.ServerType = server.application_type
325 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
326 1
        serv.IsOnline = True
327 1
        if discovery_configuration:
328
            params = ua.RegisterServer2Parameters()
329
            params.Server = serv
330
            params.DiscoveryConfiguration = discovery_configuration
331
            return self.uaclient.register_server2(params)
332
        else:
333 1
            return self.uaclient.register_server(serv)
334
335 1
    def find_servers(self, uris=None):
336
        """
337
        send a FindServer request to the server. The answer should be a list of
338
        servers the server knows about
339
        A list of uris can be provided, only server having matching uris will be returned
340
        """
341 1
        if uris is None:
342 1
            uris = []
343 1
        params = ua.FindServersParameters()
344 1
        params.EndpointUrl = self.server_url.geturl()
345 1
        params.ServerUris = uris
346 1
        return self.uaclient.find_servers(params)
347
348 1
    def find_servers_on_network(self):
349
        params = ua.FindServersOnNetworkParameters()
350
        return self.uaclient.find_servers_on_network(params)
351
352 1
    def create_session(self):
353
        """
354
        send a CreateSessionRequest to server with reasonable parameters.
355
        If you want o modify settings look at code of this methods
356
        and make your own
357
        """
358 1
        desc = ua.ApplicationDescription()
359 1
        desc.ApplicationUri = self.application_uri
360 1
        desc.ProductUri = self.product_uri
361 1
        desc.ApplicationName = ua.LocalizedText(self.name)
362 1
        desc.ApplicationType = ua.ApplicationType.Client
363
364 1
        params = ua.CreateSessionParameters()
365 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
366 1
        params.ClientNonce = nonce
367 1
        params.ClientCertificate = self.security_policy.client_certificate
368 1
        params.ClientDescription = desc
369 1
        params.EndpointUrl = self.server_url.geturl()
370 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
371 1
        params.RequestedSessionTimeout = 3600000
372 1
        params.MaxResponseMessageSize = 0  # means no max size
373 1
        response = self.uaclient.create_session(params)
374 1
        if self.security_policy.client_certificate is None:
375 1
            data = nonce
376
        else:
377 1
            data = self.security_policy.client_certificate + nonce
378 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
379 1
        self._server_nonce = response.ServerNonce
380 1
        if not self.security_policy.server_certificate:
381 1
            self.security_policy.server_certificate = response.ServerCertificate
382 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
383
            raise ua.UaError("Server certificate mismatch")
384
        # remember PolicyId's: we will use them in activate_session()
385 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
386 1
        self._policy_ids = ep.UserIdentityTokens
387 1
        self.session_timeout = response.RevisedSessionTimeout
388 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
389 1
        self.keepalive.start()
390 1
        return response
391
392 1
    def server_policy_id(self, token_type, default):
393
        """
394
        Find PolicyId of server's UserTokenPolicy by token_type.
395
        Return default if there's no matching UserTokenPolicy.
396
        """
397 1
        for policy in self._policy_ids:
398 1
            if policy.TokenType == token_type:
399 1
                return policy.PolicyId
400
        return default
401
402 1
    def server_policy_uri(self, token_type):
403
        """
404
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
405
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
406
        of the endpoint
407
        """
408 1
        for policy in self._policy_ids:
409 1
            if policy.TokenType == token_type:
410 1
                if policy.SecurityPolicyUri:
411
                    return policy.SecurityPolicyUri
412
                else:   # empty URI means "use this endpoint's policy URI"
413 1
                    return self.security_policy.URI
414
        return self.security_policy.URI
415
416 1
    def activate_session(self, username=None, password=None, certificate=None):
417
        """
418
        Activate session using either username and password or private_key
419
        """
420 1
        params = ua.ActivateSessionParameters()
421 1
        challenge = b""
422 1
        if self.security_policy.server_certificate is not None:
423 1
            challenge += self.security_policy.server_certificate
424 1
        if self._server_nonce is not None:
425 1
            challenge += self._server_nonce
426 1
        params.ClientSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
427 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
428 1
        params.LocaleIds.append("en")
429 1
        if not username and not certificate:
430 1
            self._add_anonymous_auth(params)
431 1
        elif certificate:
432
            self._add_certificate_auth(params, certificate, challenge)
433
        else:
434 1
            self._add_user_auth(params, username, password)
435 1
        return self.uaclient.activate_session(params)
436
437 1
    def _add_anonymous_auth(self, params):
438 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
439 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
440
441 1
    def _add_certificate_auth(self, params, certificate, challenge):
442
        params.UserIdentityToken = ua.X509IdentityToken()
443
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
444
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
445
        # specs part 4, 5.6.3.1: the data to sign is created by appending
446
        # the last serverNonce to the serverCertificate
447
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
448
        params.UserTokenSignature = ua.SignatureData()
449
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
450
        params.UserTokenSignature.Signature = sig
451
452 1
    def _add_user_auth(self, params, username, password):
453 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
454 1
        params.UserIdentityToken.UserName = username
455 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
456 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
457
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
458
            # then the password only contains UTF-8 encoded password
459
            # and EncryptionAlgorithm is null
460 1
            if self._password:
461
                self.logger.warning("Sending plain-text password")
462
                params.UserIdentityToken.Password = password
463 1
            params.UserIdentityToken.EncryptionAlgorithm = None
464
        elif self._password:
465
            data, uri = self._encrypt_password(password, policy_uri)
466
            params.UserIdentityToken.Password = data
467
            params.UserIdentityToken.EncryptionAlgorithm = uri
468 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
469
470 1
    def _encrypt_password(self, password, policy_uri):
471
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
472
        # see specs part 4, 7.36.3: if the token is encrypted, password
473
        # shall be converted to UTF-8 and serialized with server nonce
474
        passwd = password.encode("utf8")
475
        if self._server_nonce is not None:
476
            passwd += self._server_nonce
477
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
478
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
479
        return data, uri
480
481 1
    def close_session(self):
482
        """
483
        Close session
484
        """
485 1
        if self.keepalive:
486 1
            self.keepalive.stop()
487 1
        return self.uaclient.close_session(True)
488
489 1
    def get_root_node(self):
490 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
491
492 1
    def get_objects_node(self):
493 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
494
495 1
    def get_server_node(self):
496 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
497
498 1
    def get_node(self, nodeid):
499
        """
500
        Get node using NodeId object or a string representing a NodeId
501
        """
502 1
        return Node(self.uaclient, nodeid)
503
504 1
    def create_subscription(self, period, handler):
505
        """
506
        Create a subscription.
507
        returns a Subscription object which allow
508
        to subscribe to events or data on server
509
        handler argument is a class with data_change and/or event methods.
510
        period argument is either a publishing interval in milliseconds or a
511
        CreateSubscriptionParameters instance. The second option should be used,
512
        if the opcua-server has problems with the default options.
513
        These methods will be called when notfication from server are received.
514
        See example-client.py.
515
        Do not do expensive/slow or network operation from these methods
516
        since they are called directly from receiving thread. This is a design choice,
517
        start another thread if you need to do such a thing.
518
        """
519
520 1
        if isinstance(period, ua.CreateSubscriptionParameters):
521
            return Subscription(self.uaclient, period, handler)
522 1
        params = ua.CreateSubscriptionParameters()
523 1
        params.RequestedPublishingInterval = period
524 1
        params.RequestedLifetimeCount = 10000
525 1
        params.RequestedMaxKeepAliveCount = 3000
526 1
        params.MaxNotificationsPerPublish = 10000
527 1
        params.PublishingEnabled = True
528 1
        params.Priority = 0
529 1
        return Subscription(self.uaclient, params, handler)
530
531 1
    def get_namespace_array(self):
532 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
533 1
        return ns_node.get_value()
534
535 1
    def get_namespace_index(self, uri):
536 1
        uries = self.get_namespace_array()
537 1
        return uries.index(uri)
538
539 1
    def delete_nodes(self, nodes, recursive=False):
540 1
        return delete_nodes(self.uaclient, nodes, recursive)
541
542 1
    def import_xml(self, path):
543
        """
544
        Import nodes defined in xml
545
        """
546 1
        importer = XmlImporter(self)
547 1
        return importer.import_xml(path)
548
549 1
    def export_xml(self, nodes, path):
550
        """
551
        Export defined nodes to xml
552
        """
553 1
        exp = XmlExporter(self)
554 1
        exp.build_etree(nodes)
555 1
        return exp.write_xml(path)
556
557 1
    def register_namespace(self, uri):
558
        """
559
        Register a new namespace. Nodes should in custom namespace, not 0.
560
        This method is mainly implemented for symetry with server
561
        """
562 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
563 1
        uries = ns_node.get_value()
564 1
        if uri in uries:
565 1
            return uries.index(uri)
566 1
        uries.append(uri)
567 1
        ns_node.set_value(uries)
568 1
        return len(uries) - 1
569
570 1
    def load_type_definitions(self, nodes=None):
571
        return load_type_definitions(self, nodes)
572
573
574