Completed
Pull Request — master (#128)
by Alexander
02:25
created

opcua.client.Client.find_servers()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2
Metric Value
cc 2
dl 0
loc 12
ccs 7
cts 7
cp 1
crap 2
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
use_crypto = True
17 1
try:
18 1
    from opcua.crypto import uacrypto
19 1
except ImportError:
20 1
    print("cryptography is not installed, use of crypto disabled")
21 1
    use_crypto = False
22
23
24 1
class KeepAlive(Thread):
25
26
    """
27
    Used by Client to keep session opened.
28
    OPCUA defines timeout both for sessions and secure channel
29
    """
30
31 1
    def __init__(self, client, timeout):
32 1
        Thread.__init__(self)
33 1
        self.logger = logging.getLogger(__name__)
34 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
35
            timeout = 360000
36 1
        self.timeout = timeout
37 1
        self.client = client
38 1
        self._dostop = False
39 1
        self._cond = Condition()
40
41 1
    def run(self):
42 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
43 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
44 1
        while not self._dostop:
45 1
            with self._cond:
46 1
                self._cond.wait(self.timeout / 1000)
47 1
            if self._dostop:
48 1
                break
49
            self.logger.debug("renewing channel")
50
            self.client.open_secure_channel(renew=True)
51
            val = server_state.get_value()
52
            self.logger.debug("server state is: %s ", val)
53 1
        self.logger.debug("keepalive thread has stopped")
54
55 1
    def stop(self):
56 1
        self.logger.debug("stoping keepalive thread")
57 1
        self._dostop = True
58 1
        with self._cond:
59 1
            self._cond.notify_all()
60
61
62 1
class Client(object):
63
64
    """
65
    High level client to connect to an OPC-UA server.
66
67
    This class makes it easy to connect and browse address space.
68
    It attemps to expose as much functionality as possible
69
    but if you want more flexibility it is possible and adviced to
70
    use UaClient object, available as self.uaclient
71
    which offers the raw OPC-UA services interface.
72
    """
73
74 1
    def __init__(self, url, timeout=4):
75
        """
76
        used url argument to connect to server.
77
        if you are unsure of url, write at least hostname and port
78
        and call get_endpoints
79
        timeout is the timeout to get an answer for requests to server
80
        public member of this call are available to be set by API users
81
82
        """
83 1
        self.logger = logging.getLogger(__name__)
84 1
        self.server_url = urlparse(url)
85 1
        self.name = "Pure Python Client"
86 1
        self.description = self.name
87 1
        self.application_uri = "urn:freeopcua:client"
88 1
        self.product_uri = "urn:freeopcua.github.no:client"
89 1
        self.security_policy = ua.SecurityPolicy()
90 1
        self.secure_channel_id = None
91 1
        self.default_timeout = 3600000
92 1
        self.secure_channel_timeout = self.default_timeout
93 1
        self.session_timeout = self.default_timeout
94 1
        self._policy_ids = []
95 1
        self.uaclient = UaClient(timeout)
96 1
        self.user_certificate = None
97 1
        self.user_private_key = None
98 1
        self._session_counter = 1
99 1
        self.keepalive = None
100
101 1
    @staticmethod
102
    def find_endpoint(endpoints, security_mode, policy_uri):
103
        """
104
        Find endpoint with required security mode and policy URI
105
        """
106 1
        for ep in endpoints:
107 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
108
                    ep.SecurityMode == security_mode and
109
                    ep.SecurityPolicyUri == policy_uri):
110 1
                return ep
111
        raise ValueError("No matching endpoints: {}, {}".format(
112
                         security_mode, policy_uri))
113
114 1
    def set_security_string(self, string):
115
        """
116
        Set SecureConnection mode. String format:
117
        Policy,Mode,certificate,private_key[,server_private_key]
118
        where Policy is Basic128Rsa15 or Basic256,
119
            Mode is Sign or SignAndEncrypt
120
            certificate, private_key and server_private_key are
121
                paths to .pem or .der files
122
        Call this before connect()
123
        """
124
        if not string:
125
            return
126
        parts = string.split(',')
127
        if len(parts) < 4:
128
            raise UaError('Wrong format: `{}`, expected at least 4 '
129
                    'comma-separated values'.format(string))
130
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
131
        mode = getattr(ua.MessageSecurityMode, parts[1])
132
        return self.set_security(policy_class, parts[2], parts[3],
133
                                 parts[4] if len(parts) >= 5 else None, mode)
134
135 1
    def set_security(self, policy, certificate_path, private_key_path,
136
                     server_certificate_path=None,
137
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
138
        """
139
        Set SecureConnection mode.
140
        Call this before connect()
141
        """
142
        if server_certificate_path is None:
143
            # load certificate from server's list of endpoints
144
            endpoints = self.connect_and_get_server_endpoints()
145
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
146
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
147
        else:
148
            server_cert = uacrypto.load_certificate(server_certificate_path)
149
        cert = uacrypto.load_certificate(certificate_path)
150
        pk = uacrypto.load_private_key(private_key_path)
151
        self.security_policy = policy(server_cert, cert, pk, mode)
152
        self.uaclient.set_security(self.security_policy)
153
154 1
    def load_client_certificate(self, path):
155
        """
156
        load our certificate from file, either pem or der
157
        """
158
        self.user_certificate = uacrypto.load_certificate(path)
159
160 1
    def load_private_key(self, path):
161
        self.user_private_key = uacrypto.load_private_key(path)
162
163 1
    def connect_and_get_server_endpoints(self):
164
        """
165
        Connect, ask server for endpoints, and disconnect
166
        """
167
        self.connect_socket()
168
        self.send_hello()
169
        self.open_secure_channel()
170
        endpoints = self.get_endpoints()
171
        self.close_secure_channel()
172
        self.disconnect_socket()
173
        return endpoints
174
175 1
    def connect_and_find_servers(self):
176
        """
177
        Connect, ask server for a list of known servers, and disconnect
178
        """
179
        self.connect_socket()
180
        self.send_hello()
181
        self.open_secure_channel()  # spec says it should not be necessary to open channel
182
        servers = self.find_servers()
183
        self.close_secure_channel()
184
        self.disconnect_socket()
185
        return servers
186
187 1
    def connect_and_find_servers_on_network(self):
188
        """
189
        Connect, ask server for a list of known servers on network, and disconnect
190
        """
191
        self.connect_socket()
192
        self.send_hello()
193
        self.open_secure_channel()
194
        servers = self.find_servers_on_network()
195
        self.close_secure_channel()
196
        self.disconnect_socket()
197
        return servers
198
199 1
    def connect(self):
200
        """
201
        High level method
202
        Connect, create and activate session
203
        """
204 1
        self.connect_socket()
205 1
        self.send_hello()
206 1
        self.open_secure_channel()
207 1
        self.create_session()
208 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
209
210 1
    def disconnect(self):
211
        """
212
        High level method
213
        Close session, secure channel and socket
214
        """
215 1
        self.close_session()
216 1
        self.close_secure_channel()
217 1
        self.disconnect_socket()
218
219 1
    def connect_socket(self):
220
        """
221
        connect to socket defined in url
222
        """
223 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
224
225 1
    def disconnect_socket(self):
226 1
        self.uaclient.disconnect_socket()
227
228 1
    def send_hello(self):
229
        """
230
        Send OPC-UA hello to server
231
        """
232 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
233
        # FIXME check ack
234
235 1
    def open_secure_channel(self, renew=False):
236
        """
237
        Open secure channel, if renew is True, renew channel
238
        """
239 1
        params = ua.OpenSecureChannelParameters()
240 1
        params.ClientProtocolVersion = 0
241 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
242 1
        if renew:
243
            params.RequestType = ua.SecurityTokenRequestType.Renew
244 1
        params.SecurityMode = self.security_policy.Mode
245 1
        params.RequestedLifetime = self.secure_channel_timeout
246 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
247 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
248 1
        result = self.uaclient.open_secure_channel(params)
249 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
250 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
251
252 1
    def close_secure_channel(self):
253 1
        return self.uaclient.close_secure_channel()
254
255 1
    def get_endpoints(self):
256 1
        params = ua.GetEndpointsParameters()
257 1
        params.EndpointUrl = self.server_url.geturl()
258 1
        return self.uaclient.get_endpoints(params)
259
260 1
    def register_server(self, server, discovery_configuration=None):
261
        """
262
        register a server to discovery server
263
        if discovery_configuration is provided, the newer register_server2 service call is used
264
        """
265 1
        serv = ua.RegisteredServer()
266 1
        serv.ServerUri = server.application_uri
267 1
        serv.ProductUri = server.product_uri
268 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
269 1
        serv.ServerType = server.application_type
270 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
271 1
        serv.IsOnline = True
272 1
        if discovery_configuration:
273
            params = ua.RegisterServer2Parameters()
274
            params.Server = serv
275
            params.DiscoveryConfiguration = discovery_configuration
276
            return self.uaclient.register_server2(params)
277
        else:
278 1
            return self.uaclient.register_server(serv)
279
280 1
    def find_servers(self, uris=None):
281
        """
282
        send a FindServer request to the server. The answer should be a list of
283
        servers the server knows about
284
        A list of uris can be provided, only server having matching uris will be returned
285
        """
286 1
        if uris is None:
287 1
            uris = []
288 1
        params = ua.FindServersParameters()
289 1
        params.EndpointUrl = self.server_url.geturl()
290 1
        params.ServerUris = uris
291 1
        return self.uaclient.find_servers(params)
292
293 1
    def find_servers_on_network(self):
294
        params = ua.FindServersOnNetworkParameters()
295
        return self.uaclient.find_servers_on_network(params)
296
297 1
    def create_session(self):
298 1
        desc = ua.ApplicationDescription()
299 1
        desc.ApplicationUri = self.application_uri
300 1
        desc.ProductUri = self.product_uri
301 1
        desc.ApplicationName = ua.LocalizedText(self.name)
302 1
        desc.ApplicationType = ua.ApplicationType.Client
303
304 1
        params = ua.CreateSessionParameters()
305 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
306 1
        params.ClientNonce = nonce
307 1
        params.ClientCertificate = self.security_policy.client_certificate
308 1
        params.ClientDescription = desc
309 1
        params.EndpointUrl = self.server_url.geturl()
310 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
311 1
        params.RequestedSessionTimeout = 3600000
312 1
        params.MaxResponseMessageSize = 0  # means no max size
313 1
        response = self.uaclient.create_session(params)
314 1
        self.security_policy.asymmetric_cryptography.verify(self.security_policy.client_certificate + nonce, response.ServerSignature.Signature)
315 1
        self._server_nonce = response.ServerNonce
316 1
        if not self.security_policy.server_certificate:
317 1
            self.security_policy.server_certificate = response.ServerCertificate
318
        elif self.security_policy.server_certificate != response.ServerCertificate:
319
            raise UaError("Server certificate mismatch")
320
        # remember PolicyId's: we will use them in activate_session()
321 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
322 1
        self._policy_ids = ep.UserIdentityTokens
323 1
        self.session_timeout = response.RevisedSessionTimeout
324 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
325 1
        self.keepalive.start()
326 1
        return response
327
328 1
    def server_policy_id(self, token_type, default):
329
        """
330
        Find PolicyId of server's UserTokenPolicy by token_type.
331
        Return default if there's no matching UserTokenPolicy.
332
        """
333 1
        for policy in self._policy_ids:
334 1
            if policy.TokenType == token_type:
335 1
                return policy.PolicyId
336 1
        return default
337
338 1
    def server_policy_uri(self, token_type):
339
        """
340
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
341
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
342
        of the endpoint
343
        """
344
        for policy in self._policy_ids:
345
            if policy.TokenType == token_type:
346
                if policy.SecurityPolicyUri:
347
                    return policy.SecurityPolicyUri
348
                else:   # empty URI means "use this endpoint's policy URI"
349
                    return self.security_policy.URI
350
        return self.security_policy.URI
351
352 1
    def activate_session(self, username=None, password=None, certificate=None):
353
        """
354
        Activate session using either username and password or private_key
355
        """
356 1
        params = ua.ActivateSessionParameters()
357 1
        challenge = self.security_policy.server_certificate + self._server_nonce
358 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
359 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
360 1
        params.LocaleIds.append("en")
361 1
        if not username and not certificate:
362 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
363 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
364 1
        elif certificate:
365
            params.UserIdentityToken = ua.X509IdentityToken()
366
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
367
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
368
            # specs part 4, 5.6.3.1: the data to sign is created by appending
369
            # the last serverNonce to the serverCertificate
370
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
371
            params.UserTokenSignature = ua.SignatureData()
372
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
373
            params.UserTokenSignature.Signature = sig
374
        else:
375 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
376 1
            params.UserIdentityToken.UserName = username
377 1
            if self.server_url.password:
378
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
379
                # see specs part 4, 7.36.3: if the token is encrypted, password
380
                # shall be converted to UTF-8 and serialized with server nonce
381
                etoken = ua.pack_bytes(bytes(password, "utf8") + self._server_nonce)
382
                (data, uri) = security_policies.encrypt_asymmetric(pubkey,
383
                        etoken,
384
                        self.server_policy_uri(ua.UserTokenType.UserName))
385
                params.UserIdentityToken.Password = data
386
                params.UserIdentityToken.EncryptionAlgorithm = uri
387 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
388 1
        return self.uaclient.activate_session(params)
389
390 1
    def close_session(self):
391
        """
392
        Close session
393
        """
394 1
        if self.keepalive:
395 1
            self.keepalive.stop()
396 1
        return self.uaclient.close_session(True)
397
398 1
    def get_root_node(self):
399 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
400
401 1
    def get_objects_node(self):
402 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
403
404 1
    def get_server_node(self):
405 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
406
407 1
    def get_node(self, nodeid):
408
        """
409
        Get node using NodeId object or a string representing a NodeId
410
        """
411 1
        return Node(self.uaclient, nodeid)
412
413 1
    def create_subscription(self, period, handler):
414
        """
415
        Create a subscription.
416
        returns a Subscription object which allow
417
        to subscribe to events or data on server
418
        handler argument is a class with data_change and/or event methods.
419
        These methods will be called when notfication from server are received.
420
        See example-client.py.
421
        Do not do expensive/slow or network operation from these methods
422
        since they are called directly from receiving thread. This is a design choice,
423
        start another thread if you need to do such a thing.
424
        """
425 1
        params = ua.CreateSubscriptionParameters()
426 1
        params.RequestedPublishingInterval = period
427 1
        params.RequestedLifetimeCount = 3000
428 1
        params.RequestedMaxKeepAliveCount = 10000
429 1
        params.MaxNotificationsPerPublish = 10000
430 1
        params.PublishingEnabled = True
431 1
        params.Priority = 0
432 1
        return Subscription(self.uaclient, params, handler)
433
434 1
    def get_namespace_array(self):
435 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
436 1
        return ns_node.get_value()
437
438 1
    def get_namespace_index(self, uri):
439 1
        uries = self.get_namespace_array()
440 1
        return uries.index(uri)
441
442 1
    def delete_nodes(self, nodes, recursive=False):
443 1
        return delete_nodes(self.uaclient, nodes, recursive)
444
            
445