Completed
Push — master ( deca8f...028a36 )
by Olivier
02:22
created

opcua.Client   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 361
Duplicated Lines 0 %

Test Coverage

Coverage 71.89%
Metric Value
wmc 51
dl 0
loc 361
ccs 156
cts 217
cp 0.7189
rs 8.3206

31 Methods

Rating   Name   Duplication   Size   Complexity  
A find_servers() 0 12 2
A connect_and_get_server_endpoints() 0 11 1
A send_hello() 0 5 1
A connect_and_find_servers() 0 11 1
A load_private_key() 0 2 1
B create_session() 0 30 3
B __init__() 0 26 1
A open_secure_channel() 0 16 2
A set_security_string() 0 20 4
A set_security() 0 18 2
A load_client_certificate() 0 5 1
A close_secure_channel() 0 2 1
A disconnect_socket() 0 2 1
A connect_and_find_servers_on_network() 0 11 1
A server_policy_id() 0 9 3
A find_servers_on_network() 0 3 1
B find_endpoint() 0 12 5
A connect() 0 10 1
A register_server() 0 19 2
A disconnect() 0 8 1
A connect_socket() 0 5 1
A get_endpoints() 0 4 1
A get_objects_node() 0 2 1
A close_session() 0 7 2
A get_namespace_index() 0 3 1
A get_node() 0 5 1
B activate_session() 0 34 5
A get_server_node() 0 2 1
A get_root_node() 0 2 1
A create_subscription() 0 20 1
A get_namespace_array() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like opcua.Client often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
from opcua import security_policies
14 1
use_crypto = True
15 1
try:
16 1
    from opcua import uacrypto
17 1
except:
18 1
    print("cryptography is not installed, use of crypto disabled")
19 1
    use_crypto = False
20
21
22 1
class KeepAlive(Thread):
23
24
    """
25
    Used by Client to keep session opened.
26
    OPCUA defines timeout both for sessions and secure channel
27
    """
28
29 1
    def __init__(self, client, timeout):
30 1
        Thread.__init__(self)
31 1
        self.logger = logging.getLogger(__name__)
32 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
33
            timeout = 360000
34 1
        self.timeout = timeout
35 1
        self.client = client
36 1
        self._dostop = False
37 1
        self._cond = Condition()
38
39 1
    def run(self):
40 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
41 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
42 1
        while not self._dostop:
43 1
            with self._cond:
44 1
                self._cond.wait(self.timeout / 1000)
45 1
            if self._dostop:
46 1
                break
47
            self.logger.debug("renewing channel")
48
            self.client.open_secure_channel(renew=True)
49
            val = server_state.get_value()
50
            self.logger.debug("server state is: %s ", val)
51 1
        self.logger.debug("keepalive thread has stopped")
52
53 1
    def stop(self):
54 1
        self.logger.debug("stoping keepalive thread")
55 1
        with self._cond:
56 1
            self._cond.notify_all()
57 1
        self._dostop = True
58
59
60 1
class Client(object):
61
62
    """
63
    High level client to connect to an OPC-UA server.
64
    This class makes it easy to connect and browse address space.
65
    It attemps to expose as much functionality as possible
66
    but if you want to do to special things you will probably need
67
    to work with the BinaryClient object, available as self.bclient
68
    which offers a raw OPC-UA interface.
69
    """
70
71 1
    def __init__(self, url, timeout=1):
72
        """
73
        used url argument to connect to server.
74
        if you are unsure of url, write at least hostname and port
75
        and call get_endpoints
76
        timeout is the timeout to get an answer for requests to server
77
        public member of this call are available to be set by API users
78
79
        """
80 1
        self.logger = logging.getLogger(__name__)
81 1
        self.server_url = urlparse(url)
82 1
        self.name = "Pure Python Client"
83 1
        self.description = self.name
84 1
        self.application_uri = "urn:freeopcua:client"
85 1
        self.product_uri = "urn:freeopcua.github.no:client"
86 1
        self.security_policy = ua.SecurityPolicy()
87 1
        self.secure_channel_id = None
88 1
        self.default_timeout = 3600000
89 1
        self.secure_channel_timeout = self.default_timeout
90 1
        self.session_timeout = self.default_timeout
91 1
        self._policy_ids = []
92 1
        self.bclient = BinaryClient(timeout)
93 1
        self.user_certificate = None
94 1
        self.user_private_key = None
95 1
        self._session_counter = 1
96 1
        self.keepalive = None
97
98 1
    @staticmethod
99
    def find_endpoint(endpoints, security_mode, policy_uri):
100
        """
101
        Find endpoint with required security mode and policy URI
102
        """
103 1
        for ep in endpoints:
104 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
105
                    ep.SecurityMode == security_mode and
106
                    ep.SecurityPolicyUri == policy_uri):
107 1
                return ep
108
        raise ValueError("No matching endpoints: {}, {}".format(
109
                         security_mode, policy_uri))
110
111 1
    def set_security_string(self, string):
112
        """
113
        Set SecureConnection mode. String format:
114
        Policy,Mode,certificate,private_key[,server_private_key]
115
        where Policy is Basic128Rsa15 or Basic256,
116
            Mode is Sign or SignAndEncrypt
117
            certificate, private_key and server_private_key are
118
                paths to .pem or .der files
119
        Call this before connect()
120
        """
121
        if not string:
122
            return
123
        parts = string.split(',')
124
        if len(parts) < 4:
125
            raise Exception('Wrong format: `{}`, expected at least 4 '
126
                    'comma-separated values'.format(string))
127
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
128
        mode = getattr(ua.MessageSecurityMode, parts[1])
129
        return self.set_security(policy_class, parts[2], parts[3],
130
                                 parts[4] if len(parts) >= 5 else None, mode)
131
132 1
    def set_security(self, policy, certificate_path, private_key_path,
133
                     server_certificate_path=None,
134
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
135
        """
136
        Set SecureConnection mode.
137
        Call this before connect()
138
        """
139
        if server_certificate_path is None:
140
            # load certificate from server's list of endpoints
141
            endpoints = self.connect_and_get_server_endpoints()
142
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
143
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
144
        else:
145
            server_cert = uacrypto.load_certificate(server_certificate_path)
146
        cert = uacrypto.load_certificate(certificate_path)
147
        pk = uacrypto.load_private_key(private_key_path)
148
        self.security_policy = policy(server_cert, cert, pk, mode)
149
        self.bclient.set_security(self.security_policy)
150
151 1
    def load_client_certificate(self, path):
152
        """
153
        load our certificate from file, either pem or der
154
        """
155
        self.user_certificate = uacrypto.load_certificate(path)
156
157 1
    def load_private_key(self, path):
158
        self.user_private_key = uacrypto.load_private_key(path)
159
160 1
    def connect_and_get_server_endpoints(self):
161
        """
162
        Connect, ask server for endpoints, and disconnect
163
        """
164
        self.connect_socket()
165
        self.send_hello()
166
        self.open_secure_channel()
167
        endpoints = self.get_endpoints()
168
        self.close_secure_channel()
169
        self.disconnect_socket()
170
        return endpoints
171
172 1
    def connect_and_find_servers(self):
173
        """
174
        Connect, ask server for a list of known servers, and disconnect
175
        """
176
        self.connect_socket()
177
        self.send_hello()
178
        self.open_secure_channel()  # spec says it should not be necessary to open channel
179
        servers = self.find_servers()
180
        self.close_secure_channel()
181
        self.disconnect_socket()
182
        return servers
183
184 1
    def connect_and_find_servers_on_network(self):
185
        """
186
        Connect, ask server for a list of known servers on network, and disconnect
187
        """
188
        self.connect_socket()
189
        self.send_hello()
190
        self.open_secure_channel()
191
        servers = self.find_servers_on_network()
192
        self.close_secure_channel()
193
        self.disconnect_socket()
194
        return servers
195
196 1
    def connect(self):
197
        """
198
        High level method
199
        Connect, create and activate session
200
        """
201 1
        self.connect_socket()
202 1
        self.send_hello()
203 1
        self.open_secure_channel()
204 1
        self.create_session()
205 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
206
207 1
    def disconnect(self):
208
        """
209
        High level method
210
        Close session, secure channel and socket
211
        """
212 1
        self.close_session()
213 1
        self.close_secure_channel()
214 1
        self.disconnect_socket()
215
216 1
    def connect_socket(self):
217
        """
218
        connect to socket defined in url
219
        """
220 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
221
222 1
    def disconnect_socket(self):
223 1
        self.bclient.disconnect_socket()
224
225 1
    def send_hello(self):
226
        """
227
        Send OPC-UA hello to server
228
        """
229 1
        ack = self.bclient.send_hello(self.server_url.geturl())
230
        # FIXME check ack
231
232 1
    def open_secure_channel(self, renew=False):
233
        """
234
        Open secure channel, if renew is True, renew channel
235
        """
236 1
        params = ua.OpenSecureChannelParameters()
237 1
        params.ClientProtocolVersion = 0
238 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
239 1
        if renew:
240
            params.RequestType = ua.SecurityTokenRequestType.Renew
241 1
        params.SecurityMode = self.security_policy.Mode
242 1
        params.RequestedLifetime = self.secure_channel_timeout
243 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
244 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
245 1
        result = self.bclient.open_secure_channel(params)
246 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
247 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
248
249 1
    def close_secure_channel(self):
250 1
        return self.bclient.close_secure_channel()
251
252 1
    def get_endpoints(self):
253 1
        params = ua.GetEndpointsParameters()
254 1
        params.EndpointUrl = self.server_url.geturl()
255 1
        return self.bclient.get_endpoints(params)
256
257 1
    def register_server(self, server, discovery_configuration=None):
258
        """
259
        register a server to discovery server
260
        if discovery_configuration is provided, the newer register_server2 service call is used
261
        """
262 1
        serv = ua.RegisteredServer()
263 1
        serv.ServerUri = server.application_uri
264 1
        serv.ProductUri = server.product_uri
265 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
266 1
        serv.ServerType = server.application_type
267 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
268 1
        serv.IsOnline = True
269 1
        if discovery_configuration:
270
            params = ua.RegisterServer2Parameters()
271
            params.Server = serv
272
            params.DiscoveryConfiguration = discovery_configuration
273
            return self.bclient.register_server2(params)
274
        else:
275 1
            return self.bclient.register_server(serv)
276
277 1
    def find_servers(self, uris=None):
278
        """
279
        send a FindServer request to the server. The answer should be a list of
280
        servers the server knows about
281
        A list of uris can be provided, only server having matching uris will be returned
282
        """
283 1
        if uris is None:
284 1
            uris = []
285 1
        params = ua.FindServersParameters()
286 1
        params.EndpointUrl = self.server_url.geturl()
287 1
        params.ServerUris = uris 
288 1
        return self.bclient.find_servers(params)
289
290 1
    def find_servers_on_network(self):
291
        params = ua.FindServersOnNetworkParameters()
292
        return self.bclient.find_servers_on_network(params)
293
294 1
    def create_session(self):
295 1
        desc = ua.ApplicationDescription()
296 1
        desc.ApplicationUri = self.application_uri
297 1
        desc.ProductUri = self.product_uri
298 1
        desc.ApplicationName = ua.LocalizedText(self.name)
299 1
        desc.ApplicationType = ua.ApplicationType.Client
300
301 1
        params = ua.CreateSessionParameters()
302 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
303 1
        params.ClientNonce = nonce
304 1
        params.ClientCertificate = self.security_policy.client_certificate
305 1
        params.ClientDescription = desc
306 1
        params.EndpointUrl = self.server_url.geturl()
307 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
308 1
        params.RequestedSessionTimeout = 3600000
309 1
        params.MaxResponseMessageSize = 0  # means no max size
310 1
        response = self.bclient.create_session(params)
311 1
        self.security_policy.asymmetric_cryptography.verify(self.security_policy.client_certificate + nonce, response.ServerSignature.Signature)
312 1
        self._server_nonce = response.ServerNonce
313 1
        if not self.security_policy.server_certificate:
314 1
            self.security_policy.server_certificate = response.ServerCertificate
315
        elif self.security_policy.server_certificate != response.ServerCertificate:
316
            raise Exception("Server certificate mismatch")
317
        # remember PolicyId's: we will use them in activate_session()
318 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
319 1
        self._policy_ids = ep.UserIdentityTokens
320 1
        self.session_timeout = response.RevisedSessionTimeout
321 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
322 1
        self.keepalive.start()
323 1
        return response
324
325 1
    def server_policy_id(self, token_type, default):
326
        """
327
        Find PolicyId of server's UserTokenPolicy by token_type.
328
        Return default if there's no matching UserTokenPolicy.
329
        """
330 1
        for policy in self._policy_ids:
331 1
            if policy.TokenType == token_type:
332 1
                return policy.PolicyId
333 1
        return default
334
335 1
    def activate_session(self, username=None, password=None, certificate=None):
336
        """
337
        Activate session using either username and password or private_key
338
        """
339 1
        params = ua.ActivateSessionParameters()
340 1
        challenge = self.security_policy.server_certificate + self._server_nonce
341 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
342 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
343 1
        params.LocaleIds.append("en")
344 1
        if not username and not certificate:
345 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
346 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
347 1
        elif certificate:
348
            params.UserIdentityToken = ua.X509IdentityToken()
349
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
350
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
351
            sig = uacrypto.sign_sha1(self.user_private_key, certificate)
352
            params.UserTokenSignature = ua.SignatureData()
353
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
354
            params.UserTokenSignature.Signature = sig
355
        else:
356 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
357 1
            params.UserIdentityToken.UserName = username 
358 1
            if self.server_url.password:
359
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
360
                # see specs part 4, 7.36.3: if the token is encrypted, password
361
                # shall be converted to UTF-8 and serialized with server nonce
362
                etoken = ua.pack_bytes(bytes(password, "utf8") + self._server_nonce)
363
                #data = uacrypto.encrypt_basic256(pubkey, etoken)
364
                data = uacrypto.encrypt_rsa_oaep(pubkey, etoken)
365
                params.UserIdentityToken.Password = data
366 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
367 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
368 1
        return self.bclient.activate_session(params)
369
370 1
    def close_session(self):
371
        """
372
        Close session
373
        """
374 1
        if self.keepalive:
375 1
            self.keepalive.stop()
376 1
        return self.bclient.close_session(True)
377
378 1
    def get_root_node(self):
379 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
380
381 1
    def get_objects_node(self):
382 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
383
384 1
    def get_server_node(self):
385 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
386
387 1
    def get_node(self, nodeid):
388
        """
389
        Get node using NodeId object or a string representing a NodeId
390
        """
391 1
        return Node(self.bclient, nodeid)
392
393 1
    def create_subscription(self, period, handler):
394
        """
395
        Create a subscription.
396
        returns a Subscription object which allow
397
        to subscribe to events or data on server
398
        handler argument is a class with data_change and/or event methods.
399
        These methods will be called when notfication from server are received.
400
        See example-client.py.
401
        Do not do expensive/slow or network operation from these methods 
402
        since they are called directly from receiving thread. This is a design choice,
403
        start another thread if you need to do such a thing.
404
        """
405 1
        params = ua.CreateSubscriptionParameters()
406 1
        params.RequestedPublishingInterval = period
407 1
        params.RequestedLifetimeCount = 3000
408 1
        params.RequestedMaxKeepAliveCount = 10000
409 1
        params.MaxNotificationsPerPublish = 10000
410 1
        params.PublishingEnabled = True
411 1
        params.Priority = 0
412 1
        return Subscription(self.bclient, params, handler)
413
414 1
    def get_namespace_array(self):
415 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
416 1
        return ns_node.get_value()
417
418 1
    def get_namespace_index(self, uri):
419 1
        uries = self.get_namespace_array()
420
        return uries.index(uri)
421