Completed
Push — master ( 6c2020...aef276 )
by Olivier
04:52
created

Client.set_user()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
c 1
b 0
f 1
dl 0
loc 6
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
use_crypto = True
20 1
try:
21 1
    from opcua.crypto import uacrypto
22
except ImportError:
23
    print("cryptography is not installed, use of crypto disabled")
24
    use_crypto = False
25
26
27 1
class KeepAlive(Thread):
28
29
    """
30
    Used by Client to keep the session open.
31
    OPCUA defines timeout both for sessions and secure channel
32
    """
33
34 1
    def __init__(self, client, timeout):
35
        """
36
        :param session_timeout: Timeout to re-new the session
37
            in milliseconds.
38
        """
39 1
        Thread.__init__(self)
40 1
        self.logger = logging.getLogger(__name__)
41
42 1
        self.client = client
43 1
        self._dostop = False
44 1
        self._cond = Condition()
45 1
        self.timeout = timeout
46
47
        # some server support no timeout, but we do not trust them
48 1
        if self.timeout == 0:
49
            self.timeout = 3600000 # 1 hour
50
51 1
    def run(self):
52 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
53 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
54 1
        while not self._dostop:
55 1
            with self._cond:
56 1
                self._cond.wait(self.timeout / 1000)
57 1
            if self._dostop:
58 1
                break
59
            self.logger.debug("renewing channel")
60
            self.client.open_secure_channel(renew=True)
61
            val = server_state.get_value()
62
            self.logger.debug("server state is: %s ", val)
63 1
        self.logger.debug("keepalive thread has stopped")
64
65 1
    def stop(self):
66 1
        self.logger.debug("stoping keepalive thread")
67 1
        self._dostop = True
68 1
        with self._cond:
69 1
            self._cond.notify_all()
70
71
72 1
class Client(object):
73
74
    """
75
    High level client to connect to an OPC-UA server.
76
77
    This class makes it easy to connect and browse address space.
78
    It attemps to expose as much functionality as possible
79
    but if you want more flexibility it is possible and adviced to
80
    use UaClient object, available as self.uaclient
81
    which offers the raw OPC-UA services interface.
82
    """
83
84 1
    def __init__(self, url, timeout=4):
85
        """
86
87
        :param url: url of the server.
88
            if you are unsure of url, write at least hostname
89
            and port and call get_endpoints
90
91
        :param timeout:
92
            Each request sent to the server expects an answer within this
93
            time. The timeout is specified in seconds.
94
        """
95 1
        self.logger = logging.getLogger(__name__)
96 1
        self.server_url = urlparse(url)
97
        #take initial username and password from the url
98 1
        self._username = self.server_url.username
99 1
        self._password = self.server_url.password
100 1
        self.name = "Pure Python Client"
101 1
        self.description = self.name
102 1
        self.application_uri = "urn:freeopcua:client"
103 1
        self.product_uri = "urn:freeopcua.github.no:client"
104 1
        self.security_policy = ua.SecurityPolicy()
105 1
        self.secure_channel_id = None
106 1
        self.secure_channel_timeout = 3600000 # 1 hour
107 1
        self.session_timeout = 3600000 # 1 hour
108 1
        self._policy_ids = []
109 1
        self.uaclient = UaClient(timeout)
110 1
        self.user_certificate = None
111 1
        self.user_private_key = None
112 1
        self._session_counter = 1
113 1
        self.keepalive = None
114 1
        self.nodes = Shortcuts(self.uaclient)
115
116 1
    def __enter__(self):
117 1
        self.connect()
118 1
        return self
119
120 1
    def __exit__(self, exc_type, exc_value, traceback):
121 1
        self.disconnect()
122
123 1
    @staticmethod
124
    def find_endpoint(endpoints, security_mode, policy_uri):
125
        """
126
        Find endpoint with required security mode and policy URI
127
        """
128 1
        for ep in endpoints:
129 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
130
                    ep.SecurityMode == security_mode and
131
                    ep.SecurityPolicyUri == policy_uri):
132 1
                return ep
133 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(
134
                         security_mode, policy_uri))
135
136 1
    def set_user(self, username):
137
        """
138
        Set user name for the connection.
139
        initial user from the URL will be overwritten
140
        """
141
        self._username = username
142
143 1
    def set_password(self, pwd):
144
        """
145
        Set user password for the connection.
146
        initial password from the URL will be overwritten
147
        """
148
        self._password = pwd
149
150 1
    def set_security_string(self, string):
151
        """
152
        Set SecureConnection mode. String format:
153
        Policy,Mode,certificate,private_key[,server_private_key]
154
        where Policy is Basic128Rsa15 or Basic256,
155
            Mode is Sign or SignAndEncrypt
156
            certificate, private_key and server_private_key are
157
                paths to .pem or .der files
158
        Call this before connect()
159
        """
160 1
        if not string:
161
            return
162 1
        parts = string.split(',')
163 1
        if len(parts) < 4:
164
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
165 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
166 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
167 1
        return self.set_security(policy_class, parts[2], parts[3],
168
                                 parts[4] if len(parts) >= 5 else None, mode)
169
170 1
    def set_security(self, policy, certificate_path, private_key_path,
171
                     server_certificate_path=None,
172
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
173
        """
174
        Set SecureConnection mode.
175
        Call this before connect()
176
        """
177 1
        if server_certificate_path is None:
178
            # load certificate from server's list of endpoints
179 1
            endpoints = self.connect_and_get_server_endpoints()
180 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
181 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
182
        else:
183
            server_cert = uacrypto.load_certificate(server_certificate_path)
184 1
        cert = uacrypto.load_certificate(certificate_path)
185 1
        pk = uacrypto.load_private_key(private_key_path)
186 1
        self.security_policy = policy(server_cert, cert, pk, mode)
187 1
        self.uaclient.set_security(self.security_policy)
188
189 1
    def load_client_certificate(self, path):
190
        """
191
        load our certificate from file, either pem or der
192
        """
193
        self.user_certificate = uacrypto.load_certificate(path)
194
195 1
    def load_private_key(self, path):
196
        """
197
        Load user private key. This is used for authenticating using certificate
198
        """
199
        self.user_private_key = uacrypto.load_private_key(path)
200
201 1
    def connect_and_get_server_endpoints(self):
202
        """
203
        Connect, ask server for endpoints, and disconnect
204
        """
205 1
        self.connect_socket()
206 1
        self.send_hello()
207 1
        self.open_secure_channel()
208 1
        endpoints = self.get_endpoints()
209 1
        self.close_secure_channel()
210 1
        self.disconnect_socket()
211 1
        return endpoints
212
213 1
    def connect_and_find_servers(self):
214
        """
215
        Connect, ask server for a list of known servers, and disconnect
216
        """
217
        self.connect_socket()
218
        self.send_hello()
219
        self.open_secure_channel()  # spec says it should not be necessary to open channel
220
        servers = self.find_servers()
221
        self.close_secure_channel()
222
        self.disconnect_socket()
223
        return servers
224
225 1
    def connect_and_find_servers_on_network(self):
226
        """
227
        Connect, ask server for a list of known servers on network, and disconnect
228
        """
229
        self.connect_socket()
230
        self.send_hello()
231
        self.open_secure_channel()
232
        servers = self.find_servers_on_network()
233
        self.close_secure_channel()
234
        self.disconnect_socket()
235
        return servers
236
237 1
    def connect(self):
238
        """
239
        High level method
240
        Connect, create and activate session
241
        """
242 1
        self.connect_socket()
243 1
        self.send_hello()
244 1
        self.open_secure_channel()
245 1
        self.create_session()
246 1
        self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
247
248 1
    def disconnect(self):
249
        """
250
        High level method
251
        Close session, secure channel and socket
252
        """
253 1
        self.close_session()
254 1
        self.close_secure_channel()
255 1
        self.disconnect_socket()
256
257 1
    def connect_socket(self):
258
        """
259
        connect to socket defined in url
260
        """
261 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
262
263 1
    def disconnect_socket(self):
264 1
        self.uaclient.disconnect_socket()
265
266 1
    def send_hello(self):
267
        """
268
        Send OPC-UA hello to server
269
        """
270 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
271
        # FIXME check ack
272
273 1
    def open_secure_channel(self, renew=False):
274
        """
275
        Open secure channel, if renew is True, renew channel
276
        """
277 1
        params = ua.OpenSecureChannelParameters()
278 1
        params.ClientProtocolVersion = 0
279 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
280 1
        if renew:
281
            params.RequestType = ua.SecurityTokenRequestType.Renew
282 1
        params.SecurityMode = self.security_policy.Mode
283 1
        params.RequestedLifetime = self.secure_channel_timeout
284 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
285 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
286 1
        result = self.uaclient.open_secure_channel(params)
287 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
288 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
289
290 1
    def close_secure_channel(self):
291 1
        return self.uaclient.close_secure_channel()
292
293 1
    def get_endpoints(self):
294 1
        params = ua.GetEndpointsParameters()
295 1
        params.EndpointUrl = self.server_url.geturl()
296 1
        return self.uaclient.get_endpoints(params)
297
298 1
    def register_server(self, server, discovery_configuration=None):
299
        """
300
        register a server to discovery server
301
        if discovery_configuration is provided, the newer register_server2 service call is used
302
        """
303 1
        serv = ua.RegisteredServer()
304 1
        serv.ServerUri = server.application_uri
305 1
        serv.ProductUri = server.product_uri
306 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
307 1
        serv.ServerType = server.application_type
308 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
309 1
        serv.IsOnline = True
310 1
        if discovery_configuration:
311
            params = ua.RegisterServer2Parameters()
312
            params.Server = serv
313
            params.DiscoveryConfiguration = discovery_configuration
314
            return self.uaclient.register_server2(params)
315
        else:
316 1
            return self.uaclient.register_server(serv)
317
318 1
    def find_servers(self, uris=None):
319
        """
320
        send a FindServer request to the server. The answer should be a list of
321
        servers the server knows about
322
        A list of uris can be provided, only server having matching uris will be returned
323
        """
324 1
        if uris is None:
325 1
            uris = []
326 1
        params = ua.FindServersParameters()
327 1
        params.EndpointUrl = self.server_url.geturl()
328 1
        params.ServerUris = uris
329 1
        return self.uaclient.find_servers(params)
330
331 1
    def find_servers_on_network(self):
332
        params = ua.FindServersOnNetworkParameters()
333
        return self.uaclient.find_servers_on_network(params)
334
335 1
    def create_session(self):
336
        """
337
        send a CreateSessionRequest to server with reasonable parameters.
338
        If you want o modify settings look at code of this methods
339
        and make your own
340
        """
341 1
        desc = ua.ApplicationDescription()
342 1
        desc.ApplicationUri = self.application_uri
343 1
        desc.ProductUri = self.product_uri
344 1
        desc.ApplicationName = ua.LocalizedText(self.name)
345 1
        desc.ApplicationType = ua.ApplicationType.Client
346
347 1
        params = ua.CreateSessionParameters()
348 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
349 1
        params.ClientNonce = nonce
350 1
        params.ClientCertificate = self.security_policy.client_certificate
351 1
        params.ClientDescription = desc
352 1
        params.EndpointUrl = self.server_url.geturl()
353 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
354 1
        params.RequestedSessionTimeout = 3600000
355 1
        params.MaxResponseMessageSize = 0  # means no max size
356 1
        response = self.uaclient.create_session(params)
357 1
        if self.security_policy.client_certificate is None:
358 1
            data = nonce
359
        else:
360 1
            data = self.security_policy.client_certificate + nonce
361 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
362 1
        self._server_nonce = response.ServerNonce
363 1
        if not self.security_policy.server_certificate:
364 1
            self.security_policy.server_certificate = response.ServerCertificate
365 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
366
            raise ua.UaError("Server certificate mismatch")
367
        # remember PolicyId's: we will use them in activate_session()
368 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
369 1
        self._policy_ids = ep.UserIdentityTokens
370 1
        self.session_timeout = response.RevisedSessionTimeout
371 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
372 1
        self.keepalive.start()
373 1
        return response
374
375 1
    def server_policy_id(self, token_type, default):
376
        """
377
        Find PolicyId of server's UserTokenPolicy by token_type.
378
        Return default if there's no matching UserTokenPolicy.
379
        """
380 1
        for policy in self._policy_ids:
381 1
            if policy.TokenType == token_type:
382 1
                return policy.PolicyId
383
        return default
384
385 1
    def server_policy_uri(self, token_type):
386
        """
387
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
388
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
389
        of the endpoint
390
        """
391 1
        for policy in self._policy_ids:
392 1
            if policy.TokenType == token_type:
393 1
                if policy.SecurityPolicyUri:
394
                    return policy.SecurityPolicyUri
395
                else:   # empty URI means "use this endpoint's policy URI"
396 1
                    return self.security_policy.URI
397
        return self.security_policy.URI
398
399 1
    def activate_session(self, username=None, password=None, certificate=None):
400
        """
401
        Activate session using either username and password or private_key
402
        """
403 1
        params = ua.ActivateSessionParameters()
404 1
        challenge = b""
405 1
        if self.security_policy.server_certificate is not None:
406 1
            challenge += self.security_policy.server_certificate
407 1
        if self._server_nonce is not None:
408 1
            challenge += self._server_nonce
409 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
410 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
411 1
        params.LocaleIds.append("en")
412 1
        if not username and not certificate:
413 1
            self._add_anonymous_auth(params)
414 1
        elif certificate:
415
            self._add_certificate_auth(params, certificate, challenge)
416
        else:
417 1
            self._add_user_auth(params, username, password)
418 1
        return self.uaclient.activate_session(params)
419
420 1
    def _add_anonymous_auth(self, params):
421 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
422 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
423
424 1
    def _add_certificate_auth(self, params, certificate, challenge):
425
        params.UserIdentityToken = ua.X509IdentityToken()
426
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
427
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
428
        # specs part 4, 5.6.3.1: the data to sign is created by appending
429
        # the last serverNonce to the serverCertificate
430
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
431
        params.UserTokenSignature = ua.SignatureData()
432
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
433
        params.UserTokenSignature.Signature = sig
434
435 1
    def _add_user_auth(self, params, username, password):
436 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
437 1
        params.UserIdentityToken.UserName = username
438 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
439 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
440
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
441
            # then the password only contains UTF-8 encoded password
442
            # and EncryptionAlgorithm is null
443 1
            if self._password:
444
                self.logger.warning("Sending plain-text password")
445
                params.UserIdentityToken.Password = password
446 1
            params.UserIdentityToken.EncryptionAlgorithm = ''
447
        elif self._password:
448
            data, uri = self._encrypt_password(password, policy_uri)
449
            params.UserIdentityToken.Password = data
450
            params.UserIdentityToken.EncryptionAlgorithm = uri
451 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
452
453 1
    def _encrypt_password(self, password, policy_uri):
454
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
455
        # see specs part 4, 7.36.3: if the token is encrypted, password
456
        # shall be converted to UTF-8 and serialized with server nonce
457
        passwd = password.encode("utf8")
458
        if self._server_nonce is not None:
459
            passwd += self._server_nonce
460
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
461
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
462
        return data, uri
463
464 1
    def close_session(self):
465
        """
466
        Close session
467
        """
468 1
        if self.keepalive:
469 1
            self.keepalive.stop()
470 1
        return self.uaclient.close_session(True)
471
472 1
    def get_root_node(self):
473 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
474
475 1
    def get_objects_node(self):
476 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
477
478 1
    def get_server_node(self):
479 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
480
481 1
    def get_node(self, nodeid):
482
        """
483
        Get node using NodeId object or a string representing a NodeId
484
        """
485 1
        return Node(self.uaclient, nodeid)
486
487 1
    def create_subscription(self, period, handler):
488
        """
489
        Create a subscription.
490
        returns a Subscription object which allow
491
        to subscribe to events or data on server
492
        handler argument is a class with data_change and/or event methods.
493
        period argument is either a publishing interval in seconds or a
494
        CreateSubscriptionParameters instance. The second option should be used,
495
        if the opcua-server has problems with the default options.
496
        These methods will be called when notfication from server are received.
497
        See example-client.py.
498
        Do not do expensive/slow or network operation from these methods
499
        since they are called directly from receiving thread. This is a design choice,
500
        start another thread if you need to do such a thing.
501
        """
502
503 1
        if isinstance(period, ua.CreateSubscriptionParameters):
504
             return Subscription(self.uaclient, period, handler)
505 1
        params = ua.CreateSubscriptionParameters()
506 1
        params.RequestedPublishingInterval = period
507 1
        params.RequestedLifetimeCount = 10000
508 1
        params.RequestedMaxKeepAliveCount = 3000
509 1
        params.MaxNotificationsPerPublish = 10000
510 1
        params.PublishingEnabled = True
511 1
        params.Priority = 0
512 1
        return Subscription(self.uaclient, params, handler)
513
514 1
    def get_namespace_array(self):
515 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
516 1
        return ns_node.get_value()
517
518 1
    def get_namespace_index(self, uri):
519 1
        uries = self.get_namespace_array()
520 1
        return uries.index(uri)
521
522 1
    def delete_nodes(self, nodes, recursive=False):
523 1
        return delete_nodes(self.uaclient, nodes, recursive)
524
525 1
    def import_xml(self, path):
526
        """
527
        Import nodes defined in xml
528
        """
529 1
        importer = XmlImporter(self)
530 1
        return importer.import_xml(path)
531
532 1
    def export_xml(self, nodes, path):
533
        """
534
        Export defined nodes to xml
535
        """
536 1
        exp = XmlExporter(self)
537 1
        exp.build_etree(nodes)
538 1
        return exp.write_xml(path)
539
540 1
    def register_namespace(self, uri):
541
        """
542
        Register a new namespace. Nodes should in custom namespace, not 0.
543
        This method is mainly implemented for symetry with server
544
        """
545 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
546 1
        uries = ns_node.get_value()
547 1
        if uri in uries:
548 1
            return uries.index(uri)
549 1
        uries.append(uri)
550 1
        ns_node.set_value(uries)
551 1
        return len(uries) - 1
552
553
554