Passed
Push — dev ( b34770...12545c )
by Olivier
02:36
created

opcua.client.Client.connect_and_find_servers()   A

Complexity

Conditions 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.6699
Metric Value
cc 1
dl 0
loc 11
ccs 1
cts 8
cp 0.125
crap 1.6699
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
use_crypto = True
17 1
try:
18 1
    from opcua.crypto import uacrypto
19 1
except ImportError:
20 1
    print("cryptography is not installed, use of crypto disabled")
21 1
    use_crypto = False
22
23
24 1
class KeepAlive(Thread):
25
26
    """
27
    Used by Client to keep session opened.
28
    OPCUA defines timeout both for sessions and secure channel
29
    """
30
31 1
    def __init__(self, client, timeout):
32 1
        Thread.__init__(self)
33 1
        self.logger = logging.getLogger(__name__)
34 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
35
            timeout = 360000
36 1
        self.timeout = timeout
37 1
        self.client = client
38 1
        self._dostop = False
39 1
        self._cond = Condition()
40
41 1
    def run(self):
42 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
43 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
44 1
        while not self._dostop:
45 1
            with self._cond:
46 1
                self._cond.wait(self.timeout / 1000)
47 1
            if self._dostop:
48 1
                break
49
            self.logger.debug("renewing channel")
50
            self.client.open_secure_channel(renew=True)
51
            val = server_state.get_value()
52
            self.logger.debug("server state is: %s ", val)
53 1
        self.logger.debug("keepalive thread has stopped")
54
55 1
    def stop(self):
56 1
        self.logger.debug("stoping keepalive thread")
57 1
        self._dostop = True
58 1
        with self._cond:
59 1
            self._cond.notify_all()
60
61
62 1
class Client(object):
63
64
    """
65
    High level client to connect to an OPC-UA server.
66
    This class makes it easy to connect and browse address space.
67
    It attemps to expose as much functionality as possible
68
    but if you want to do to special things you will probably need
69
    to work with the UaClient object, available as self.uaclient
70
    which offers a raw OPC-UA interface.
71
    """
72
73 1
    def __init__(self, url, timeout=4):
74
        """
75
        used url argument to connect to server.
76
        if you are unsure of url, write at least hostname and port
77
        and call get_endpoints
78
        timeout is the timeout to get an answer for requests to server
79
        public member of this call are available to be set by API users
80
81
        """
82 1
        self.logger = logging.getLogger(__name__)
83 1
        self.server_url = urlparse(url)
84 1
        self.name = "Pure Python Client"
85 1
        self.description = self.name
86 1
        self.application_uri = "urn:freeopcua:client"
87 1
        self.product_uri = "urn:freeopcua.github.no:client"
88 1
        self.security_policy = ua.SecurityPolicy()
89 1
        self.secure_channel_id = None
90 1
        self.default_timeout = 3600000
91 1
        self.secure_channel_timeout = self.default_timeout
92 1
        self.session_timeout = self.default_timeout
93 1
        self._policy_ids = []
94 1
        self.uaclient = UaClient(timeout)
95 1
        self.user_certificate = None
96 1
        self.user_private_key = None
97 1
        self._session_counter = 1
98 1
        self.keepalive = None
99
100 1
    @staticmethod
101
    def find_endpoint(endpoints, security_mode, policy_uri):
102
        """
103
        Find endpoint with required security mode and policy URI
104
        """
105 1
        for ep in endpoints:
106 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
107
                    ep.SecurityMode == security_mode and
108
                    ep.SecurityPolicyUri == policy_uri):
109 1
                return ep
110
        raise ValueError("No matching endpoints: {}, {}".format(
111
                         security_mode, policy_uri))
112
113 1
    def set_security_string(self, string):
114
        """
115
        Set SecureConnection mode. String format:
116
        Policy,Mode,certificate,private_key[,server_private_key]
117
        where Policy is Basic128Rsa15 or Basic256,
118
            Mode is Sign or SignAndEncrypt
119
            certificate, private_key and server_private_key are
120
                paths to .pem or .der files
121
        Call this before connect()
122
        """
123
        if not string:
124
            return
125
        parts = string.split(',')
126
        if len(parts) < 4:
127
            raise UaError('Wrong format: `{}`, expected at least 4 '
128
                    'comma-separated values'.format(string))
129
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
130
        mode = getattr(ua.MessageSecurityMode, parts[1])
131
        return self.set_security(policy_class, parts[2], parts[3],
132
                                 parts[4] if len(parts) >= 5 else None, mode)
133
134 1
    def set_security(self, policy, certificate_path, private_key_path,
135
                     server_certificate_path=None,
136
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
137
        """
138
        Set SecureConnection mode.
139
        Call this before connect()
140
        """
141
        if server_certificate_path is None:
142
            # load certificate from server's list of endpoints
143
            endpoints = self.connect_and_get_server_endpoints()
144
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
145
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
146
        else:
147
            server_cert = uacrypto.load_certificate(server_certificate_path)
148
        cert = uacrypto.load_certificate(certificate_path)
149
        pk = uacrypto.load_private_key(private_key_path)
150
        self.security_policy = policy(server_cert, cert, pk, mode)
151
        self.uaclient.set_security(self.security_policy)
152
153 1
    def load_client_certificate(self, path):
154
        """
155
        load our certificate from file, either pem or der
156
        """
157
        self.user_certificate = uacrypto.load_certificate(path)
158
159 1
    def load_private_key(self, path):
160
        self.user_private_key = uacrypto.load_private_key(path)
161
162 1
    def connect_and_get_server_endpoints(self):
163
        """
164
        Connect, ask server for endpoints, and disconnect
165
        """
166
        self.connect_socket()
167
        self.send_hello()
168
        self.open_secure_channel()
169
        endpoints = self.get_endpoints()
170
        self.close_secure_channel()
171
        self.disconnect_socket()
172
        return endpoints
173
174 1
    def connect_and_find_servers(self):
175
        """
176
        Connect, ask server for a list of known servers, and disconnect
177
        """
178
        self.connect_socket()
179
        self.send_hello()
180
        self.open_secure_channel()  # spec says it should not be necessary to open channel
181
        servers = self.find_servers()
182
        self.close_secure_channel()
183
        self.disconnect_socket()
184
        return servers
185
186 1
    def connect_and_find_servers_on_network(self):
187
        """
188
        Connect, ask server for a list of known servers on network, and disconnect
189
        """
190
        self.connect_socket()
191
        self.send_hello()
192
        self.open_secure_channel()
193
        servers = self.find_servers_on_network()
194
        self.close_secure_channel()
195
        self.disconnect_socket()
196
        return servers
197
198 1
    def connect(self):
199
        """
200
        High level method
201
        Connect, create and activate session
202
        """
203 1
        self.connect_socket()
204 1
        self.send_hello()
205 1
        self.open_secure_channel()
206 1
        self.create_session()
207 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
208
209 1
    def disconnect(self):
210
        """
211
        High level method
212
        Close session, secure channel and socket
213
        """
214 1
        self.close_session()
215 1
        self.close_secure_channel()
216 1
        self.disconnect_socket()
217
218 1
    def connect_socket(self):
219
        """
220
        connect to socket defined in url
221
        """
222 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
223
224 1
    def disconnect_socket(self):
225 1
        self.uaclient.disconnect_socket()
226
227 1
    def send_hello(self):
228
        """
229
        Send OPC-UA hello to server
230
        """
231 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
232
        # FIXME check ack
233
234 1
    def open_secure_channel(self, renew=False):
235
        """
236
        Open secure channel, if renew is True, renew channel
237
        """
238 1
        params = ua.OpenSecureChannelParameters()
239 1
        params.ClientProtocolVersion = 0
240 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
241 1
        if renew:
242
            params.RequestType = ua.SecurityTokenRequestType.Renew
243 1
        params.SecurityMode = self.security_policy.Mode
244 1
        params.RequestedLifetime = self.secure_channel_timeout
245 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
246 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
247 1
        result = self.uaclient.open_secure_channel(params)
248 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
249 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
250
251 1
    def close_secure_channel(self):
252 1
        return self.uaclient.close_secure_channel()
253
254 1
    def get_endpoints(self):
255 1
        params = ua.GetEndpointsParameters()
256 1
        params.EndpointUrl = self.server_url.geturl()
257 1
        return self.uaclient.get_endpoints(params)
258
259 1
    def register_server(self, server, discovery_configuration=None):
260
        """
261
        register a server to discovery server
262
        if discovery_configuration is provided, the newer register_server2 service call is used
263
        """
264 1
        serv = ua.RegisteredServer()
265 1
        serv.ServerUri = server.application_uri
266 1
        serv.ProductUri = server.product_uri
267 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
268 1
        serv.ServerType = server.application_type
269 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
270 1
        serv.IsOnline = True
271 1
        if discovery_configuration:
272
            params = ua.RegisterServer2Parameters()
273
            params.Server = serv
274
            params.DiscoveryConfiguration = discovery_configuration
275
            return self.uaclient.register_server2(params)
276
        else:
277 1
            return self.uaclient.register_server(serv)
278
279 1
    def find_servers(self, uris=None):
280
        """
281
        send a FindServer request to the server. The answer should be a list of
282
        servers the server knows about
283
        A list of uris can be provided, only server having matching uris will be returned
284
        """
285 1
        if uris is None:
286 1
            uris = []
287 1
        params = ua.FindServersParameters()
288 1
        params.EndpointUrl = self.server_url.geturl()
289 1
        params.ServerUris = uris
290 1
        return self.uaclient.find_servers(params)
291
292 1
    def find_servers_on_network(self):
293
        params = ua.FindServersOnNetworkParameters()
294
        return self.uaclient.find_servers_on_network(params)
295
296 1
    def create_session(self):
297 1
        desc = ua.ApplicationDescription()
298 1
        desc.ApplicationUri = self.application_uri
299 1
        desc.ProductUri = self.product_uri
300 1
        desc.ApplicationName = ua.LocalizedText(self.name)
301 1
        desc.ApplicationType = ua.ApplicationType.Client
302
303 1
        params = ua.CreateSessionParameters()
304 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
305 1
        params.ClientNonce = nonce
306 1
        params.ClientCertificate = self.security_policy.client_certificate
307 1
        params.ClientDescription = desc
308 1
        params.EndpointUrl = self.server_url.geturl()
309 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
310 1
        params.RequestedSessionTimeout = 3600000
311 1
        params.MaxResponseMessageSize = 0  # means no max size
312 1
        response = self.uaclient.create_session(params)
313 1
        self.security_policy.asymmetric_cryptography.verify(self.security_policy.client_certificate + nonce, response.ServerSignature.Signature)
314 1
        self._server_nonce = response.ServerNonce
315 1
        if not self.security_policy.server_certificate:
316 1
            self.security_policy.server_certificate = response.ServerCertificate
317
        elif self.security_policy.server_certificate != response.ServerCertificate:
318
            raise UaError("Server certificate mismatch")
319
        # remember PolicyId's: we will use them in activate_session()
320 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
321 1
        self._policy_ids = ep.UserIdentityTokens
322 1
        self.session_timeout = response.RevisedSessionTimeout
323 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
324 1
        self.keepalive.start()
325 1
        return response
326
327 1
    def server_policy_id(self, token_type, default):
328
        """
329
        Find PolicyId of server's UserTokenPolicy by token_type.
330
        Return default if there's no matching UserTokenPolicy.
331
        """
332 1
        for policy in self._policy_ids:
333 1
            if policy.TokenType == token_type:
334 1
                return policy.PolicyId
335 1
        return default
336
337 1
    def activate_session(self, username=None, password=None, certificate=None):
338
        """
339
        Activate session using either username and password or private_key
340
        """
341 1
        params = ua.ActivateSessionParameters()
342 1
        challenge = self.security_policy.server_certificate + self._server_nonce
343 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
344 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
345 1
        params.LocaleIds.append("en")
346 1
        if not username and not certificate:
347 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
348 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
349 1
        elif certificate:
350
            params.UserIdentityToken = ua.X509IdentityToken()
351
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
352
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
353
            # specs part 4, 5.6.3.1: the data to sign is created by appending
354
            # the last serverNonce to the serverCertificate
355
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
356
            params.UserTokenSignature = ua.SignatureData()
357
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
358
            params.UserTokenSignature.Signature = sig
359
        else:
360 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
361 1
            params.UserIdentityToken.UserName = username
362 1
            if self.server_url.password:
363
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
364
                # see specs part 4, 7.36.3: if the token is encrypted, password
365
                # shall be converted to UTF-8 and serialized with server nonce
366
                etoken = ua.pack_bytes(bytes(password, "utf8") + self._server_nonce)
367
                #data = uacrypto.encrypt_basic256(pubkey, etoken)
368
                data = uacrypto.encrypt_rsa_oaep(pubkey, etoken)
369
                params.UserIdentityToken.Password = data
370 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
371 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
372 1
        return self.uaclient.activate_session(params)
373
374 1
    def close_session(self):
375
        """
376
        Close session
377
        """
378 1
        if self.keepalive:
379 1
            self.keepalive.stop()
380 1
        return self.uaclient.close_session(True)
381
382 1
    def get_root_node(self):
383 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
384
385 1
    def get_objects_node(self):
386 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
387
388 1
    def get_server_node(self):
389 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
390
391 1
    def get_node(self, nodeid):
392
        """
393
        Get node using NodeId object or a string representing a NodeId
394
        """
395 1
        return Node(self.uaclient, nodeid)
396
397 1
    def create_subscription(self, period, handler):
398
        """
399
        Create a subscription.
400
        returns a Subscription object which allow
401
        to subscribe to events or data on server
402
        handler argument is a class with data_change and/or event methods.
403
        These methods will be called when notfication from server are received.
404
        See example-client.py.
405
        Do not do expensive/slow or network operation from these methods
406
        since they are called directly from receiving thread. This is a design choice,
407
        start another thread if you need to do such a thing.
408
        """
409 1
        params = ua.CreateSubscriptionParameters()
410 1
        params.RequestedPublishingInterval = period
411 1
        params.RequestedLifetimeCount = 3000
412 1
        params.RequestedMaxKeepAliveCount = 10000
413 1
        params.MaxNotificationsPerPublish = 10000
414 1
        params.PublishingEnabled = True
415 1
        params.Priority = 0
416 1
        return Subscription(self.uaclient, params, handler)
417
418 1
    def get_namespace_array(self):
419 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
420 1
        return ns_node.get_value()
421
422 1
    def get_namespace_index(self, uri):
423 1
        uries = self.get_namespace_array()
424 1
        return uries.index(uri)
425
426 1
    def delete_nodes(self, nodes, recursive=False):
427 1
        return delete_nodes(self.uaclient, nodes, recursive)
428
            
429