Completed
Push — master ( 41c04b...6083f5 )
by Olivier
03:26
created

Client.open_secure_channel()   A

Complexity

Conditions 2

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2.0017

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 16
ccs 12
cts 13
cp 0.9231
crap 2.0017
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
from opcua.common.shortcuts import Shortcuts
17 1
use_crypto = True
18 1
try:
19 1
    from opcua.crypto import uacrypto
20
except ImportError:
21
    print("cryptography is not installed, use of crypto disabled")
22
    use_crypto = False
23
24
25 1
class KeepAlive(Thread):
26
27
    """
28
    Used by Client to keep the session open.
29
    OPCUA defines timeout both for sessions and secure channel
30
    """
31
32 1
    def __init__(self, client, timeout):
33
        """
34
        :param session_timeout: Timeout to re-new the session
35
            in milliseconds.
36
        """
37 1
        Thread.__init__(self)
38 1
        self.logger = logging.getLogger(__name__)
39
40 1
        self.client = client
41 1
        self._dostop = False
42 1
        self._cond = Condition()
43 1
        self.timeout = timeout
44
45
        # some server support no timeout, but we do not trust them
46 1
        if self.timeout == 0:
47
            self.timeout = 3600000 # 1 hour
48
49 1
    def run(self):
50 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
51 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
52 1
        while not self._dostop:
53 1
            with self._cond:
54 1
                self._cond.wait(self.timeout / 1000)
55 1
            if self._dostop:
56 1
                break
57
            self.logger.debug("renewing channel")
58
            self.client.open_secure_channel(renew=True)
59
            val = server_state.get_value()
60
            self.logger.debug("server state is: %s ", val)
61 1
        self.logger.debug("keepalive thread has stopped")
62
63 1
    def stop(self):
64 1
        self.logger.debug("stoping keepalive thread")
65 1
        self._dostop = True
66 1
        with self._cond:
67 1
            self._cond.notify_all()
68
69
70 1
class Client(object):
71
72
    """
73
    High level client to connect to an OPC-UA server.
74
75
    This class makes it easy to connect and browse address space.
76
    It attemps to expose as much functionality as possible
77
    but if you want more flexibility it is possible and adviced to
78
    use UaClient object, available as self.uaclient
79
    which offers the raw OPC-UA services interface.
80
    """
81
82 1
    def __init__(self, url, timeout=4):
83
        """
84
85
        :param url: url of the server.
86
            if you are unsure of url, write at least hostname
87
            and port and call get_endpoints
88
89
        :param timeout:
90
            Each request sent to the server expects an answer within this
91
            time. The timeout is specified in seconds.
92
        """
93 1
        self.logger = logging.getLogger(__name__)
94 1
        self.server_url = urlparse(url)
95 1
        self.name = "Pure Python Client"
96 1
        self.description = self.name
97 1
        self.application_uri = "urn:freeopcua:client"
98 1
        self.product_uri = "urn:freeopcua.github.no:client"
99 1
        self.security_policy = ua.SecurityPolicy()
100 1
        self.secure_channel_id = None
101 1
        self.secure_channel_timeout = 3600000 # 1 hour
102 1
        self.session_timeout = 3600000 # 1 hour
103 1
        self._policy_ids = []
104 1
        self.uaclient = UaClient(timeout)
105 1
        self.user_certificate = None
106 1
        self.user_private_key = None
107 1
        self._session_counter = 1
108 1
        self.keepalive = None
109 1
        self.nodes = Shortcuts(self.uaclient)
110
111 1
    def __enter__(self):
112 1
        self.connect()
113 1
        return self
114
115 1
    def __exit__(self, exc_type, exc_value, traceback):
116 1
        self.disconnect()
117
118 1
    @staticmethod
119
    def find_endpoint(endpoints, security_mode, policy_uri):
120
        """
121
        Find endpoint with required security mode and policy URI
122
        """
123 1
        for ep in endpoints:
124 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
125
                    ep.SecurityMode == security_mode and
126
                    ep.SecurityPolicyUri == policy_uri):
127 1
                return ep
128 1
        raise ua.UaError("No matching endpoints: {}, {}".format(
129
                         security_mode, policy_uri))
130
131 1
    def set_security_string(self, string):
132
        """
133
        Set SecureConnection mode. String format:
134
        Policy,Mode,certificate,private_key[,server_private_key]
135
        where Policy is Basic128Rsa15 or Basic256,
136
            Mode is Sign or SignAndEncrypt
137
            certificate, private_key and server_private_key are
138
                paths to .pem or .der files
139
        Call this before connect()
140
        """
141 1
        if not string:
142
            return
143 1
        parts = string.split(',')
144 1
        if len(parts) < 4:
145
            raise ua.UaError('Wrong format: `{}`, expected at least 4 comma-separated values'.format(string))
146 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
147 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
148 1
        return self.set_security(policy_class, parts[2], parts[3],
149
                                 parts[4] if len(parts) >= 5 else None, mode)
150
151 1
    def set_security(self, policy, certificate_path, private_key_path,
152
                     server_certificate_path=None,
153
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
154
        """
155
        Set SecureConnection mode.
156
        Call this before connect()
157
        """
158 1
        if server_certificate_path is None:
159
            # load certificate from server's list of endpoints
160 1
            endpoints = self.connect_and_get_server_endpoints()
161 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
162 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
163
        else:
164
            server_cert = uacrypto.load_certificate(server_certificate_path)
165 1
        cert = uacrypto.load_certificate(certificate_path)
166 1
        pk = uacrypto.load_private_key(private_key_path)
167 1
        self.security_policy = policy(server_cert, cert, pk, mode)
168 1
        self.uaclient.set_security(self.security_policy)
169
170 1
    def load_client_certificate(self, path):
171
        """
172
        load our certificate from file, either pem or der
173
        """
174
        self.user_certificate = uacrypto.load_certificate(path)
175
176 1
    def load_private_key(self, path):
177
        """
178
        Load user private key. This is used for authenticating using certificate
179
        """
180
        self.user_private_key = uacrypto.load_private_key(path)
181
182 1
    def connect_and_get_server_endpoints(self):
183
        """
184
        Connect, ask server for endpoints, and disconnect
185
        """
186 1
        self.connect_socket()
187 1
        self.send_hello()
188 1
        self.open_secure_channel()
189 1
        endpoints = self.get_endpoints()
190 1
        self.close_secure_channel()
191 1
        self.disconnect_socket()
192 1
        return endpoints
193
194 1
    def connect_and_find_servers(self):
195
        """
196
        Connect, ask server for a list of known servers, and disconnect
197
        """
198
        self.connect_socket()
199
        self.send_hello()
200
        self.open_secure_channel()  # spec says it should not be necessary to open channel
201
        servers = self.find_servers()
202
        self.close_secure_channel()
203
        self.disconnect_socket()
204
        return servers
205
206 1
    def connect_and_find_servers_on_network(self):
207
        """
208
        Connect, ask server for a list of known servers on network, and disconnect
209
        """
210
        self.connect_socket()
211
        self.send_hello()
212
        self.open_secure_channel()
213
        servers = self.find_servers_on_network()
214
        self.close_secure_channel()
215
        self.disconnect_socket()
216
        return servers
217
218 1
    def connect(self):
219
        """
220
        High level method
221
        Connect, create and activate session
222
        """
223 1
        self.connect_socket()
224 1
        self.send_hello()
225 1
        self.open_secure_channel()
226 1
        self.create_session()
227 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
228
229 1
    def disconnect(self):
230
        """
231
        High level method
232
        Close session, secure channel and socket
233
        """
234 1
        self.close_session()
235 1
        self.close_secure_channel()
236 1
        self.disconnect_socket()
237
238 1
    def connect_socket(self):
239
        """
240
        connect to socket defined in url
241
        """
242 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
243
244 1
    def disconnect_socket(self):
245 1
        self.uaclient.disconnect_socket()
246
247 1
    def send_hello(self):
248
        """
249
        Send OPC-UA hello to server
250
        """
251 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
252
        # FIXME check ack
253
254 1
    def open_secure_channel(self, renew=False):
255
        """
256
        Open secure channel, if renew is True, renew channel
257
        """
258 1
        params = ua.OpenSecureChannelParameters()
259 1
        params.ClientProtocolVersion = 0
260 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
261 1
        if renew:
262
            params.RequestType = ua.SecurityTokenRequestType.Renew
263 1
        params.SecurityMode = self.security_policy.Mode
264 1
        params.RequestedLifetime = self.secure_channel_timeout
265 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
266 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
267 1
        result = self.uaclient.open_secure_channel(params)
268 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
269 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
270
271 1
    def close_secure_channel(self):
272 1
        return self.uaclient.close_secure_channel()
273
274 1
    def get_endpoints(self):
275 1
        params = ua.GetEndpointsParameters()
276 1
        params.EndpointUrl = self.server_url.geturl()
277 1
        return self.uaclient.get_endpoints(params)
278
279 1
    def register_server(self, server, discovery_configuration=None):
280
        """
281
        register a server to discovery server
282
        if discovery_configuration is provided, the newer register_server2 service call is used
283
        """
284 1
        serv = ua.RegisteredServer()
285 1
        serv.ServerUri = server.application_uri
286 1
        serv.ProductUri = server.product_uri
287 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
288 1
        serv.ServerType = server.application_type
289 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
290 1
        serv.IsOnline = True
291 1
        if discovery_configuration:
292
            params = ua.RegisterServer2Parameters()
293
            params.Server = serv
294
            params.DiscoveryConfiguration = discovery_configuration
295
            return self.uaclient.register_server2(params)
296
        else:
297 1
            return self.uaclient.register_server(serv)
298
299 1
    def find_servers(self, uris=None):
300
        """
301
        send a FindServer request to the server. The answer should be a list of
302
        servers the server knows about
303
        A list of uris can be provided, only server having matching uris will be returned
304
        """
305 1
        if uris is None:
306 1
            uris = []
307 1
        params = ua.FindServersParameters()
308 1
        params.EndpointUrl = self.server_url.geturl()
309 1
        params.ServerUris = uris
310 1
        return self.uaclient.find_servers(params)
311
312 1
    def find_servers_on_network(self):
313
        params = ua.FindServersOnNetworkParameters()
314
        return self.uaclient.find_servers_on_network(params)
315
316 1
    def create_session(self):
317
        """
318
        send a CreateSessionRequest to server with reasonable parameters.
319
        If you want o modify settings look at code of this methods
320
        and make your own
321
        """
322 1
        desc = ua.ApplicationDescription()
323 1
        desc.ApplicationUri = self.application_uri
324 1
        desc.ProductUri = self.product_uri
325 1
        desc.ApplicationName = ua.LocalizedText(self.name)
326 1
        desc.ApplicationType = ua.ApplicationType.Client
327
328 1
        params = ua.CreateSessionParameters()
329 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
330 1
        params.ClientNonce = nonce
331 1
        params.ClientCertificate = self.security_policy.client_certificate
332 1
        params.ClientDescription = desc
333 1
        params.EndpointUrl = self.server_url.geturl()
334 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
335 1
        params.RequestedSessionTimeout = 3600000
336 1
        params.MaxResponseMessageSize = 0  # means no max size
337 1
        response = self.uaclient.create_session(params)
338 1
        if self.security_policy.client_certificate is None:
339 1
            data = nonce
340
        else:
341 1
            data = self.security_policy.client_certificate + nonce
342 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
343 1
        self._server_nonce = response.ServerNonce
344 1
        if not self.security_policy.server_certificate:
345 1
            self.security_policy.server_certificate = response.ServerCertificate
346 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
347
            raise ua.UaError("Server certificate mismatch")
348
        # remember PolicyId's: we will use them in activate_session()
349 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
350 1
        self._policy_ids = ep.UserIdentityTokens
351 1
        self.session_timeout = response.RevisedSessionTimeout
352 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
353 1
        self.keepalive.start()
354 1
        return response
355
356 1
    def server_policy_id(self, token_type, default):
357
        """
358
        Find PolicyId of server's UserTokenPolicy by token_type.
359
        Return default if there's no matching UserTokenPolicy.
360
        """
361 1
        for policy in self._policy_ids:
362 1
            if policy.TokenType == token_type:
363 1
                return policy.PolicyId
364
        return default
365
366 1
    def server_policy_uri(self, token_type):
367
        """
368
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
369
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
370
        of the endpoint
371
        """
372 1
        for policy in self._policy_ids:
373 1
            if policy.TokenType == token_type:
374 1
                if policy.SecurityPolicyUri:
375
                    return policy.SecurityPolicyUri
376
                else:   # empty URI means "use this endpoint's policy URI"
377 1
                    return self.security_policy.URI
378
        return self.security_policy.URI
379
380 1
    def activate_session(self, username=None, password=None, certificate=None):
381
        """
382
        Activate session using either username and password or private_key
383
        """
384 1
        params = ua.ActivateSessionParameters()
385 1
        challenge = b""
386 1
        if self.security_policy.server_certificate is not None:
387 1
            challenge += self.security_policy.server_certificate
388 1
        if self._server_nonce is not None:
389 1
            challenge += self._server_nonce
390 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
391 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
392 1
        params.LocaleIds.append("en")
393 1
        if not username and not certificate:
394 1
            self._add_anonymous_auth(params)
395 1
        elif certificate:
396
            self._add_certificate_auth(params, certificate, challenge)
397
        else:
398 1
            self._add_user_auth(params, username, password)
399 1
        return self.uaclient.activate_session(params)
400
401 1
    def _add_anonymous_auth(self, params):
402 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
403 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
404
405 1
    def _add_certificate_auth(self, params, certificate, challenge):
406
        params.UserIdentityToken = ua.X509IdentityToken()
407
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
408
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
409
        # specs part 4, 5.6.3.1: the data to sign is created by appending
410
        # the last serverNonce to the serverCertificate
411
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
412
        params.UserTokenSignature = ua.SignatureData()
413
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
414
        params.UserTokenSignature.Signature = sig
415
416 1
    def _add_user_auth(self, params, username, password):
417 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
418 1
        params.UserIdentityToken.UserName = username
419 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
420 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
421
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
422
            # then the password only contains UTF-8 encoded password
423
            # and EncryptionAlgorithm is null
424 1
            if self.server_url.password:
425
                self.logger.warning("Sending plain-text password")
426
                params.UserIdentityToken.Password = password
427 1
            params.UserIdentityToken.EncryptionAlgorithm = ''
428
        elif self.server_url.password:
429
            data, uri = self._encrypt_password(password, policy_uri)
430
            params.UserIdentityToken.Password = data 
431
            params.UserIdentityToken.EncryptionAlgorithm = uri
432 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
433
434 1
    def _encrypt_password(self, password, policy_uri):
435
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
436
        # see specs part 4, 7.36.3: if the token is encrypted, password
437
        # shall be converted to UTF-8 and serialized with server nonce
438
        passwd = password.encode("utf8")
439
        if self._server_nonce is not None:
440
            passwd += self._server_nonce
441
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
442
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
443
        return data, uri
444
445 1
    def close_session(self):
446
        """
447
        Close session
448
        """
449 1
        if self.keepalive:
450 1
            self.keepalive.stop()
451 1
        return self.uaclient.close_session(True)
452
453 1
    def get_root_node(self):
454 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
455
456 1
    def get_objects_node(self):
457 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
458
459 1
    def get_server_node(self):
460 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
461
462 1
    def get_node(self, nodeid):
463
        """
464
        Get node using NodeId object or a string representing a NodeId
465
        """
466 1
        return Node(self.uaclient, nodeid)
467
468 1
    def create_subscription(self, period, handler):
469
        """
470
        Create a subscription.
471
        returns a Subscription object which allow
472
        to subscribe to events or data on server
473
        handler argument is a class with data_change and/or event methods.
474
        These methods will be called when notfication from server are received.
475
        See example-client.py.
476
        Do not do expensive/slow or network operation from these methods
477
        since they are called directly from receiving thread. This is a design choice,
478
        start another thread if you need to do such a thing.
479
        """
480 1
        params = ua.CreateSubscriptionParameters()
481 1
        params.RequestedPublishingInterval = period
482 1
        params.RequestedLifetimeCount = 3000
483 1
        params.RequestedMaxKeepAliveCount = 10000
484 1
        params.MaxNotificationsPerPublish = 10000
485 1
        params.PublishingEnabled = True
486 1
        params.Priority = 0
487 1
        return Subscription(self.uaclient, params, handler)
488
489 1
    def get_namespace_array(self):
490 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
491 1
        return ns_node.get_value()
492
493 1
    def get_namespace_index(self, uri):
494 1
        uries = self.get_namespace_array()
495 1
        return uries.index(uri)
496
497 1
    def delete_nodes(self, nodes, recursive=False):
498 1
        return delete_nodes(self.uaclient, nodes, recursive)
499
            
500