Completed
Push — master ( 6a68a0...bac760 )
by Olivier
02:36
created

opcua.client.Client   D

Complexity

Total Complexity 61

Size/Duplication

Total Lines 403
Duplicated Lines 0 %

Test Coverage

Coverage 70.76%
Metric Value
dl 0
loc 403
ccs 167
cts 236
cp 0.7076
rs 4.054
wmc 61

30 Methods

Rating   Name   Duplication   Size   Complexity  
A Client.server_policy_id() 0 9 3
A Client.load_client_certificate() 0 5 1
A Client.disconnect() 0 8 1
A Client.close_secure_channel() 0 2 1
A Client.connect_and_find_servers() 0 11 1
A Client.load_private_key() 0 5 1
A Client.set_security_string() 0 19 4
A Client.find_servers() 0 12 2
A Client.open_secure_channel() 0 16 2
A Client.get_endpoints() 0 4 1
A Client.disconnect_socket() 0 2 1
A Client.connect() 0 10 1
A Client.send_hello() 0 5 1
A Client.set_security() 0 18 2
A Client.find_servers_on_network() 0 3 1
A Client.connect_and_find_servers_on_network() 0 11 1
A Client.register_server() 0 19 2
B Client.find_endpoint() 0 12 5
B Client.create_session() 0 39 4
A Client.connect_socket() 0 5 1
A Client.connect_and_get_server_endpoints() 0 11 1
A Client.server_policy_uri() 0 13 4
A Client.get_namespace_array() 0 3 1
A Client.close_session() 0 7 2
A Client.create_subscription() 0 20 1
A Client.get_objects_node() 0 2 1
A Client.get_root_node() 0 2 1
A Client.get_node() 0 5 1
F Client.activate_session() 0 51 11
A Client.get_server_node() 0 2 1

How to fix   Complexity   

Complex Class

Complex classes like opcua.client.Client often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
use_crypto = True
17 1
try:
18 1
    from opcua.crypto import uacrypto
19 1
except ImportError:
20 1
    print("cryptography is not installed, use of crypto disabled")
21 1
    use_crypto = False
22
23
24 1
class KeepAlive(Thread):
25
26
    """
27
    Used by Client to keep session opened.
28
    OPCUA defines timeout both for sessions and secure channel
29
    """
30
31 1
    def __init__(self, client, timeout):
32 1
        Thread.__init__(self)
33 1
        self.logger = logging.getLogger(__name__)
34 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
35
            timeout = 360000
36 1
        self.timeout = timeout
37 1
        self.client = client
38 1
        self._dostop = False
39 1
        self._cond = Condition()
40
41 1
    def run(self):
42 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
43 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
44 1
        while not self._dostop:
45 1
            with self._cond:
46 1
                self._cond.wait(self.timeout / 1000)
47 1
            if self._dostop:
48 1
                break
49
            self.logger.debug("renewing channel")
50
            self.client.open_secure_channel(renew=True)
51
            val = server_state.get_value()
52
            self.logger.debug("server state is: %s ", val)
53 1
        self.logger.debug("keepalive thread has stopped")
54
55 1
    def stop(self):
56 1
        self.logger.debug("stoping keepalive thread")
57 1
        self._dostop = True
58 1
        with self._cond:
59 1
            self._cond.notify_all()
60
61
62 1
class Client(object):
63
64
    """
65
    High level client to connect to an OPC-UA server.
66
67
    This class makes it easy to connect and browse address space.
68
    It attemps to expose as much functionality as possible
69
    but if you want more flexibility it is possible and adviced to
70
    use UaClient object, available as self.uaclient
71
    which offers the raw OPC-UA services interface.
72
    """
73
74 1
    def __init__(self, url, timeout=4):
75
        """
76
        used url argument to connect to server.
77
        if you are unsure of url, write at least hostname and port
78
        and call get_endpoints
79
        timeout is the timeout to get an answer for requests to server
80
        public member of this call are available to be set by API users
81
82
        """
83 1
        self.logger = logging.getLogger(__name__)
84 1
        self.server_url = urlparse(url)
85 1
        self.name = "Pure Python Client"
86 1
        self.description = self.name
87 1
        self.application_uri = "urn:freeopcua:client"
88 1
        self.product_uri = "urn:freeopcua.github.no:client"
89 1
        self.security_policy = ua.SecurityPolicy()
90 1
        self.secure_channel_id = None
91 1
        self.default_timeout = 3600000
92 1
        self.secure_channel_timeout = self.default_timeout
93 1
        self.session_timeout = self.default_timeout
94 1
        self._policy_ids = []
95 1
        self.uaclient = UaClient(timeout)
96 1
        self.user_certificate = None
97 1
        self.user_private_key = None
98 1
        self._session_counter = 1
99 1
        self.keepalive = None
100
101 1
    @staticmethod
102
    def find_endpoint(endpoints, security_mode, policy_uri):
103
        """
104
        Find endpoint with required security mode and policy URI
105
        """
106 1
        for ep in endpoints:
107 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
108
                    ep.SecurityMode == security_mode and
109
                    ep.SecurityPolicyUri == policy_uri):
110 1
                return ep
111
        raise ua.UaError("No matching endpoints: {}, {}".format(
112
                         security_mode, policy_uri))
113
114 1
    def set_security_string(self, string):
115
        """
116
        Set SecureConnection mode. String format:
117
        Policy,Mode,certificate,private_key[,server_private_key]
118
        where Policy is Basic128Rsa15 or Basic256,
119
            Mode is Sign or SignAndEncrypt
120
            certificate, private_key and server_private_key are
121
                paths to .pem or .der files
122
        Call this before connect()
123
        """
124
        if not string:
125
            return
126
        parts = string.split(',')
127
        if len(parts) < 4:
128
            raise ua.UaError('Wrong format: `{}`, expected at least 4 comma-separated values'.format(string))
129
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
130
        mode = getattr(ua.MessageSecurityMode, parts[1])
131
        return self.set_security(policy_class, parts[2], parts[3],
132
                                 parts[4] if len(parts) >= 5 else None, mode)
133
134 1
    def set_security(self, policy, certificate_path, private_key_path,
135
                     server_certificate_path=None,
136
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
137
        """
138
        Set SecureConnection mode.
139
        Call this before connect()
140
        """
141
        if server_certificate_path is None:
142
            # load certificate from server's list of endpoints
143
            endpoints = self.connect_and_get_server_endpoints()
144
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
145
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
146
        else:
147
            server_cert = uacrypto.load_certificate(server_certificate_path)
148
        cert = uacrypto.load_certificate(certificate_path)
149
        pk = uacrypto.load_private_key(private_key_path)
150
        self.security_policy = policy(server_cert, cert, pk, mode)
151
        self.uaclient.set_security(self.security_policy)
152
153 1
    def load_client_certificate(self, path):
154
        """
155
        load our certificate from file, either pem or der
156
        """
157
        self.user_certificate = uacrypto.load_certificate(path)
158
159 1
    def load_private_key(self, path):
160
        """
161
        Load user private key. This is used for authenticating using certificate
162
        """
163
        self.user_private_key = uacrypto.load_private_key(path)
164
165 1
    def connect_and_get_server_endpoints(self):
166
        """
167
        Connect, ask server for endpoints, and disconnect
168
        """
169
        self.connect_socket()
170
        self.send_hello()
171
        self.open_secure_channel()
172
        endpoints = self.get_endpoints()
173
        self.close_secure_channel()
174
        self.disconnect_socket()
175
        return endpoints
176
177 1
    def connect_and_find_servers(self):
178
        """
179
        Connect, ask server for a list of known servers, and disconnect
180
        """
181
        self.connect_socket()
182
        self.send_hello()
183
        self.open_secure_channel()  # spec says it should not be necessary to open channel
184
        servers = self.find_servers()
185
        self.close_secure_channel()
186
        self.disconnect_socket()
187
        return servers
188
189 1
    def connect_and_find_servers_on_network(self):
190
        """
191
        Connect, ask server for a list of known servers on network, and disconnect
192
        """
193
        self.connect_socket()
194
        self.send_hello()
195
        self.open_secure_channel()
196
        servers = self.find_servers_on_network()
197
        self.close_secure_channel()
198
        self.disconnect_socket()
199
        return servers
200
201 1
    def connect(self):
202
        """
203
        High level method
204
        Connect, create and activate session
205
        """
206 1
        self.connect_socket()
207 1
        self.send_hello()
208 1
        self.open_secure_channel()
209 1
        self.create_session()
210 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
211
212 1
    def disconnect(self):
213
        """
214
        High level method
215
        Close session, secure channel and socket
216
        """
217 1
        self.close_session()
218 1
        self.close_secure_channel()
219 1
        self.disconnect_socket()
220
221 1
    def connect_socket(self):
222
        """
223
        connect to socket defined in url
224
        """
225 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
226
227 1
    def disconnect_socket(self):
228 1
        self.uaclient.disconnect_socket()
229
230 1
    def send_hello(self):
231
        """
232
        Send OPC-UA hello to server
233
        """
234 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
235
        # FIXME check ack
236
237 1
    def open_secure_channel(self, renew=False):
238
        """
239
        Open secure channel, if renew is True, renew channel
240
        """
241 1
        params = ua.OpenSecureChannelParameters()
242 1
        params.ClientProtocolVersion = 0
243 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
244 1
        if renew:
245
            params.RequestType = ua.SecurityTokenRequestType.Renew
246 1
        params.SecurityMode = self.security_policy.Mode
247 1
        params.RequestedLifetime = self.secure_channel_timeout
248 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
249 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
250 1
        result = self.uaclient.open_secure_channel(params)
251 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
252 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
253
254 1
    def close_secure_channel(self):
255 1
        return self.uaclient.close_secure_channel()
256
257 1
    def get_endpoints(self):
258 1
        params = ua.GetEndpointsParameters()
259 1
        params.EndpointUrl = self.server_url.geturl()
260 1
        return self.uaclient.get_endpoints(params)
261
262 1
    def register_server(self, server, discovery_configuration=None):
263
        """
264
        register a server to discovery server
265
        if discovery_configuration is provided, the newer register_server2 service call is used
266
        """
267 1
        serv = ua.RegisteredServer()
268 1
        serv.ServerUri = server.application_uri
269 1
        serv.ProductUri = server.product_uri
270 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
271 1
        serv.ServerType = server.application_type
272 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
273 1
        serv.IsOnline = True
274 1
        if discovery_configuration:
275
            params = ua.RegisterServer2Parameters()
276
            params.Server = serv
277
            params.DiscoveryConfiguration = discovery_configuration
278
            return self.uaclient.register_server2(params)
279
        else:
280 1
            return self.uaclient.register_server(serv)
281
282 1
    def find_servers(self, uris=None):
283
        """
284
        send a FindServer request to the server. The answer should be a list of
285
        servers the server knows about
286
        A list of uris can be provided, only server having matching uris will be returned
287
        """
288 1
        if uris is None:
289 1
            uris = []
290 1
        params = ua.FindServersParameters()
291 1
        params.EndpointUrl = self.server_url.geturl()
292 1
        params.ServerUris = uris
293 1
        return self.uaclient.find_servers(params)
294
295 1
    def find_servers_on_network(self):
296
        params = ua.FindServersOnNetworkParameters()
297
        return self.uaclient.find_servers_on_network(params)
298
299 1
    def create_session(self):
300
        """
301
        send a CreateSessionRequest to server with reasonable parameters.
302
        If you want o modify settings look at code of this methods
303
        and make your own
304
        """
305 1
        desc = ua.ApplicationDescription()
306 1
        desc.ApplicationUri = self.application_uri
307 1
        desc.ProductUri = self.product_uri
308 1
        desc.ApplicationName = ua.LocalizedText(self.name)
309 1
        desc.ApplicationType = ua.ApplicationType.Client
310
311 1
        params = ua.CreateSessionParameters()
312 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
313 1
        params.ClientNonce = nonce
314 1
        params.ClientCertificate = self.security_policy.client_certificate
315 1
        params.ClientDescription = desc
316 1
        params.EndpointUrl = self.server_url.geturl()
317 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
318 1
        params.RequestedSessionTimeout = 3600000
319 1
        params.MaxResponseMessageSize = 0  # means no max size
320 1
        response = self.uaclient.create_session(params)
321 1
        if self.security_policy.client_certificate is None:
322 1
            data = nonce
323
        else:
324
            data = self.security_policy.client_certificate + nonce
325 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
326 1
        self._server_nonce = response.ServerNonce
327 1
        if not self.security_policy.server_certificate:
328 1
            self.security_policy.server_certificate = response.ServerCertificate
329
        elif self.security_policy.server_certificate != response.ServerCertificate:
330
            raise ua.UaError("Server certificate mismatch")
331
        # remember PolicyId's: we will use them in activate_session()
332 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
333 1
        self._policy_ids = ep.UserIdentityTokens
334 1
        self.session_timeout = response.RevisedSessionTimeout
335 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
336 1
        self.keepalive.start()
337 1
        return response
338
339 1
    def server_policy_id(self, token_type, default):
340
        """
341
        Find PolicyId of server's UserTokenPolicy by token_type.
342
        Return default if there's no matching UserTokenPolicy.
343
        """
344 1
        for policy in self._policy_ids:
345 1
            if policy.TokenType == token_type:
346 1
                return policy.PolicyId
347
        return default
348
349 1
    def server_policy_uri(self, token_type):
350
        """
351
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
352
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
353
        of the endpoint
354
        """
355 1
        for policy in self._policy_ids:
356 1
            if policy.TokenType == token_type:
357 1
                if policy.SecurityPolicyUri:
358
                    return policy.SecurityPolicyUri
359
                else:   # empty URI means "use this endpoint's policy URI"
360 1
                    return self.security_policy.URI
361
        return self.security_policy.URI
362
363 1
    def activate_session(self, username=None, password=None, certificate=None):
364
        """
365
        Activate session using either username and password or private_key
366
        """
367 1
        params = ua.ActivateSessionParameters()
368 1
        challenge = b""
369 1
        if self.security_policy.server_certificate is not None:
370
            challenge += self.security_policy.server_certificate
371 1
        if self._server_nonce is not None:
372 1
            challenge += self._server_nonce
373 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
374 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
375 1
        params.LocaleIds.append("en")
376 1
        if not username and not certificate:
377 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
378 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
379 1
        elif certificate:
380
            params.UserIdentityToken = ua.X509IdentityToken()
381
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
382
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
383
            # specs part 4, 5.6.3.1: the data to sign is created by appending
384
            # the last serverNonce to the serverCertificate
385
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
386
            params.UserTokenSignature = ua.SignatureData()
387
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
388
            params.UserTokenSignature.Signature = sig
389
        else:
390 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
391 1
            params.UserIdentityToken.UserName = username
392 1
            policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
393 1
            if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
394
                # see specs part 4, 7.36.3: if the token is NOT encrypted,
395
                # then the password only contains UTF-8 encoded password
396
                # and EncryptionAlgorithm is null
397 1
                if self.server_url.password:
398
                    self.logger.warning("Sending plain-text password")
399
                    params.UserIdentityToken.Password = password
400 1
                params.UserIdentityToken.EncryptionAlgorithm = ''
401
            elif self.server_url.password:
402
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
403
                # see specs part 4, 7.36.3: if the token is encrypted, password
404
                # shall be converted to UTF-8 and serialized with server nonce
405
                passwd = bytes(password, "utf8")
406
                if self._server_nonce is not None:
407
                    passwd += self._server_nonce
408
                etoken = ua.pack_bytes(passwd)
409
                data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
410
                params.UserIdentityToken.Password = data
411
                params.UserIdentityToken.EncryptionAlgorithm = uri
412 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
413 1
        return self.uaclient.activate_session(params)
414
415 1
    def close_session(self):
416
        """
417
        Close session
418
        """
419 1
        if self.keepalive:
420 1
            self.keepalive.stop()
421 1
        return self.uaclient.close_session(True)
422
423 1
    def get_root_node(self):
424 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
425
426 1
    def get_objects_node(self):
427 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
428
429 1
    def get_server_node(self):
430 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
431
432 1
    def get_node(self, nodeid):
433
        """
434
        Get node using NodeId object or a string representing a NodeId
435
        """
436 1
        return Node(self.uaclient, nodeid)
437
438 1
    def create_subscription(self, period, handler):
439
        """
440
        Create a subscription.
441
        returns a Subscription object which allow
442
        to subscribe to events or data on server
443
        handler argument is a class with data_change and/or event methods.
444
        These methods will be called when notfication from server are received.
445
        See example-client.py.
446
        Do not do expensive/slow or network operation from these methods
447
        since they are called directly from receiving thread. This is a design choice,
448
        start another thread if you need to do such a thing.
449
        """
450 1
        params = ua.CreateSubscriptionParameters()
451 1
        params.RequestedPublishingInterval = period
452 1
        params.RequestedLifetimeCount = 3000
453 1
        params.RequestedMaxKeepAliveCount = 10000
454 1
        params.MaxNotificationsPerPublish = 10000
455 1
        params.PublishingEnabled = True
456 1
        params.Priority = 0
457 1
        return Subscription(self.uaclient, params, handler)
458
459 1
    def get_namespace_array(self):
460 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
461 1
        return ns_node.get_value()
462
463 1
    def get_namespace_index(self, uri):
464 1
        uries = self.get_namespace_array()
465 1
        return uries.index(uri)
466
467 1
    def delete_nodes(self, nodes, recursive=False):
468 1
        return delete_nodes(self.uaclient, nodes, recursive)
469
            
470