Completed
Pull Request — master (#389)
by Olivier
03:22
created

Client.import_xml()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 6
ccs 0
cts 0
cp 0
crap 2
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
use_crypto = True
20
try:
21
    from opcua.crypto import uacrypto
22
except ImportError:
23
    print("cryptography is not installed, use of crypto disabled")
24
    use_crypto = False
25 1
26
27
class KeepAlive(Thread):
28
29
    """
30
    Used by Client to keep the session open.
31
    OPCUA defines timeout both for sessions and secure channel
32 1
    """
33
34
    def __init__(self, client, timeout):
35
        """
36
        :param session_timeout: Timeout to re-new the session
37 1
            in milliseconds.
38 1
        """
39
        Thread.__init__(self)
40 1
        self.logger = logging.getLogger(__name__)
41 1
42 1
        self.client = client
43 1
        self._dostop = False
44
        self._cond = Condition()
45
        self.timeout = timeout
46 1
47
        # some server support no timeout, but we do not trust them
48
        if self.timeout == 0:
49 1
            self.timeout = 3600000 # 1 hour
50 1
51 1
    def run(self):
52 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
53 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
54 1
        while not self._dostop:
55 1
            with self._cond:
56 1
                self._cond.wait(self.timeout / 1000)
57
            if self._dostop:
58
                break
59
            self.logger.debug("renewing channel")
60
            self.client.open_secure_channel(renew=True)
61 1
            val = server_state.get_value()
62
            self.logger.debug("server state is: %s ", val)
63 1
        self.logger.debug("keepalive thread has stopped")
64 1
65 1
    def stop(self):
66 1
        self.logger.debug("stoping keepalive thread")
67 1
        self._dostop = True
68
        with self._cond:
69
            self._cond.notify_all()
70 1
71
72
class Client(object):
73
74
    """
75
    High level client to connect to an OPC-UA server.
76
77
    This class makes it easy to connect and browse address space.
78
    It attemps to expose as much functionality as possible
79
    but if you want more flexibility it is possible and adviced to
80
    use UaClient object, available as self.uaclient
81
    which offers the raw OPC-UA services interface.
82 1
    """
83
84
    def __init__(self, url, timeout=4):
85
        """
86
87
        :param url: url of the server.
88
            if you are unsure of url, write at least hostname
89
            and port and call get_endpoints
90
91
        :param timeout:
92
            Each request sent to the server expects an answer within this
93 1
            time. The timeout is specified in seconds.
94 1
        """
95 1
        self.logger = logging.getLogger(__name__)
96 1
        self.server_url = urlparse(url)
97 1
        self.name = "Pure Python Client"
98 1
        self.description = self.name
99 1
        self.application_uri = "urn:freeopcua:client"
100 1
        self.product_uri = "urn:freeopcua.github.no:client"
101 1
        self.security_policy = ua.SecurityPolicy()
102 1
        self.secure_channel_id = None
103 1
        self.secure_channel_timeout = 3600000 # 1 hour
104 1
        self.session_timeout = 3600000 # 1 hour
105 1
        self._policy_ids = []
106 1
        self.uaclient = UaClient(timeout)
107 1
        self.user_certificate = None
108 1
        self.user_private_key = None
109 1
        self._session_counter = 1
110
        self.keepalive = None
111 1
        self.nodes = Shortcuts(self.uaclient)
112 1
113 1
    def __enter__(self):
114
        self.connect()
115 1
        return self
116 1
117
    def __exit__(self, exc_type, exc_value, traceback):
118 1
        self.disconnect()
119
120
    @staticmethod
121
    def find_endpoint(endpoints, security_mode, policy_uri):
122
        """
123 1
        Find endpoint with required security mode and policy URI
124 1
        """
125
        for ep in endpoints:
126
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
127 1
                    ep.SecurityMode == security_mode and
128 1
                    ep.SecurityPolicyUri == policy_uri):
129
                return ep
130
        raise ua.UaError("No matching endpoints: {0}, {1}".format(
131 1
                         security_mode, policy_uri))
132
133
    def set_security_string(self, string):
134
        """
135
        Set SecureConnection mode. String format:
136
        Policy,Mode,certificate,private_key[,server_private_key]
137
        where Policy is Basic128Rsa15 or Basic256,
138
            Mode is Sign or SignAndEncrypt
139
            certificate, private_key and server_private_key are
140
                paths to .pem or .der files
141 1
        Call this before connect()
142
        """
143 1
        if not string:
144 1
            return
145
        parts = string.split(',')
146 1
        if len(parts) < 4:
147 1
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
148 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
149
        mode = getattr(ua.MessageSecurityMode, parts[1])
150
        return self.set_security(policy_class, parts[2], parts[3],
151 1
                                 parts[4] if len(parts) >= 5 else None, mode)
152
153
    def set_security(self, policy, certificate_path, private_key_path,
154
                     server_certificate_path=None,
155
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
156
        """
157
        Set SecureConnection mode.
158 1
        Call this before connect()
159
        """
160 1
        if server_certificate_path is None:
161 1
            # load certificate from server's list of endpoints
162 1
            endpoints = self.connect_and_get_server_endpoints()
163
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
164
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
165 1
        else:
166 1
            server_cert = uacrypto.load_certificate(server_certificate_path)
167 1
        cert = uacrypto.load_certificate(certificate_path)
168 1
        pk = uacrypto.load_private_key(private_key_path)
169
        self.security_policy = policy(server_cert, cert, pk, mode)
170 1
        self.uaclient.set_security(self.security_policy)
171
172
    def load_client_certificate(self, path):
173
        """
174
        load our certificate from file, either pem or der
175
        """
176 1
        self.user_certificate = uacrypto.load_certificate(path)
177
178
    def load_private_key(self, path):
179
        """
180
        Load user private key. This is used for authenticating using certificate
181
        """
182 1
        self.user_private_key = uacrypto.load_private_key(path)
183
184
    def connect_and_get_server_endpoints(self):
185
        """
186 1
        Connect, ask server for endpoints, and disconnect
187 1
        """
188 1
        self.connect_socket()
189 1
        self.send_hello()
190 1
        self.open_secure_channel()
191 1
        endpoints = self.get_endpoints()
192 1
        self.close_secure_channel()
193
        self.disconnect_socket()
194 1
        return endpoints
195
196
    def connect_and_find_servers(self):
197
        """
198
        Connect, ask server for a list of known servers, and disconnect
199
        """
200
        self.connect_socket()
201
        self.send_hello()
202
        self.open_secure_channel()  # spec says it should not be necessary to open channel
203
        servers = self.find_servers()
204
        self.close_secure_channel()
205
        self.disconnect_socket()
206 1
        return servers
207
208
    def connect_and_find_servers_on_network(self):
209
        """
210
        Connect, ask server for a list of known servers on network, and disconnect
211
        """
212
        self.connect_socket()
213
        self.send_hello()
214
        self.open_secure_channel()
215
        servers = self.find_servers_on_network()
216
        self.close_secure_channel()
217
        self.disconnect_socket()
218 1
        return servers
219
220
    def connect(self):
221
        """
222
        High level method
223 1
        Connect, create and activate session
224 1
        """
225 1
        self.connect_socket()
226 1
        self.send_hello()
227 1
        self.open_secure_channel()
228
        self.create_session()
229 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
230
231
    def disconnect(self):
232
        """
233
        High level method
234 1
        Close session, secure channel and socket
235 1
        """
236 1
        self.close_session()
237
        self.close_secure_channel()
238 1
        self.disconnect_socket()
239
240
    def connect_socket(self):
241
        """
242 1
        connect to socket defined in url
243
        """
244 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
245 1
246
    def disconnect_socket(self):
247 1
        self.uaclient.disconnect_socket()
248
249
    def send_hello(self):
250
        """
251 1
        Send OPC-UA hello to server
252
        """
253
        ack = self.uaclient.send_hello(self.server_url.geturl())
254 1
        # FIXME check ack
255
256
    def open_secure_channel(self, renew=False):
257
        """
258 1
        Open secure channel, if renew is True, renew channel
259 1
        """
260 1
        params = ua.OpenSecureChannelParameters()
261 1
        params.ClientProtocolVersion = 0
262
        params.RequestType = ua.SecurityTokenRequestType.Issue
263 1
        if renew:
264 1
            params.RequestType = ua.SecurityTokenRequestType.Renew
265 1
        params.SecurityMode = self.security_policy.Mode
266 1
        params.RequestedLifetime = self.secure_channel_timeout
267 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
268 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
269 1
        result = self.uaclient.open_secure_channel(params)
270
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
271 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
272 1
273
    def close_secure_channel(self):
274 1
        return self.uaclient.close_secure_channel()
275 1
276 1
    def get_endpoints(self):
277 1
        params = ua.GetEndpointsParameters()
278
        params.EndpointUrl = self.server_url.geturl()
279 1
        return self.uaclient.get_endpoints(params)
280
281
    def register_server(self, server, discovery_configuration=None):
282
        """
283
        register a server to discovery server
284 1
        if discovery_configuration is provided, the newer register_server2 service call is used
285 1
        """
286 1
        serv = ua.RegisteredServer()
287 1
        serv.ServerUri = server.application_uri
288 1
        serv.ProductUri = server.product_uri
289 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
290 1
        serv.ServerType = server.application_type
291 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
292
        serv.IsOnline = True
293
        if discovery_configuration:
294
            params = ua.RegisterServer2Parameters()
295
            params.Server = serv
296
            params.DiscoveryConfiguration = discovery_configuration
297 1
            return self.uaclient.register_server2(params)
298
        else:
299 1
            return self.uaclient.register_server(serv)
300
301
    def find_servers(self, uris=None):
302
        """
303
        send a FindServer request to the server. The answer should be a list of
304
        servers the server knows about
305 1
        A list of uris can be provided, only server having matching uris will be returned
306 1
        """
307 1
        if uris is None:
308 1
            uris = []
309 1
        params = ua.FindServersParameters()
310 1
        params.EndpointUrl = self.server_url.geturl()
311
        params.ServerUris = uris
312 1
        return self.uaclient.find_servers(params)
313
314
    def find_servers_on_network(self):
315
        params = ua.FindServersOnNetworkParameters()
316 1
        return self.uaclient.find_servers_on_network(params)
317
318
    def create_session(self):
319
        """
320
        send a CreateSessionRequest to server with reasonable parameters.
321
        If you want o modify settings look at code of this methods
322 1
        and make your own
323 1
        """
324 1
        desc = ua.ApplicationDescription()
325 1
        desc.ApplicationUri = self.application_uri
326 1
        desc.ProductUri = self.product_uri
327
        desc.ApplicationName = ua.LocalizedText(self.name)
328 1
        desc.ApplicationType = ua.ApplicationType.Client
329 1
330 1
        params = ua.CreateSessionParameters()
331 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
332 1
        params.ClientNonce = nonce
333 1
        params.ClientCertificate = self.security_policy.client_certificate
334 1
        params.ClientDescription = desc
335 1
        params.EndpointUrl = self.server_url.geturl()
336 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
337 1
        params.RequestedSessionTimeout = 3600000
338 1
        params.MaxResponseMessageSize = 0  # means no max size
339 1
        response = self.uaclient.create_session(params)
340
        if self.security_policy.client_certificate is None:
341 1
            data = nonce
342 1
        else:
343 1
            data = self.security_policy.client_certificate + nonce
344 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
345 1
        self._server_nonce = response.ServerNonce
346 1
        if not self.security_policy.server_certificate:
347
            self.security_policy.server_certificate = response.ServerCertificate
348
        elif self.security_policy.server_certificate != response.ServerCertificate:
349 1
            raise ua.UaError("Server certificate mismatch")
350 1
        # remember PolicyId's: we will use them in activate_session()
351 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
352 1
        self._policy_ids = ep.UserIdentityTokens
353 1
        self.session_timeout = response.RevisedSessionTimeout
354 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
355
        self.keepalive.start()
356 1
        return response
357
358
    def server_policy_id(self, token_type, default):
359
        """
360
        Find PolicyId of server's UserTokenPolicy by token_type.
361 1
        Return default if there's no matching UserTokenPolicy.
362 1
        """
363 1
        for policy in self._policy_ids:
364
            if policy.TokenType == token_type:
365
                return policy.PolicyId
366 1
        return default
367
368
    def server_policy_uri(self, token_type):
369
        """
370
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
371
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
372 1
        of the endpoint
373 1
        """
374 1
        for policy in self._policy_ids:
375
            if policy.TokenType == token_type:
376
                if policy.SecurityPolicyUri:
377 1
                    return policy.SecurityPolicyUri
378
                else:   # empty URI means "use this endpoint's policy URI"
379
                    return self.security_policy.URI
380 1
        return self.security_policy.URI
381
382
    def activate_session(self, username=None, password=None, certificate=None):
383
        """
384 1
        Activate session using either username and password or private_key
385 1
        """
386 1
        params = ua.ActivateSessionParameters()
387 1
        challenge = b""
388 1
        if self.security_policy.server_certificate is not None:
389 1
            challenge += self.security_policy.server_certificate
390 1
        if self._server_nonce is not None:
391 1
            challenge += self._server_nonce
392 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
393 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
394 1
        params.LocaleIds.append("en")
395 1
        if not username and not certificate:
396
            self._add_anonymous_auth(params)
397
        elif certificate:
398 1
            self._add_certificate_auth(params, certificate, challenge)
399 1
        else:
400
            self._add_user_auth(params, username, password)
401 1
        return self.uaclient.activate_session(params)
402 1
403 1
    def _add_anonymous_auth(self, params):
404
        params.UserIdentityToken = ua.AnonymousIdentityToken()
405 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
406
407
    def _add_certificate_auth(self, params, certificate, challenge):
408
        params.UserIdentityToken = ua.X509IdentityToken()
409
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
410
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
411
        # specs part 4, 5.6.3.1: the data to sign is created by appending
412
        # the last serverNonce to the serverCertificate
413
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
414
        params.UserTokenSignature = ua.SignatureData()
415
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
416 1
        params.UserTokenSignature.Signature = sig
417 1
418 1
    def _add_user_auth(self, params, username, password):
419 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
420 1
        params.UserIdentityToken.UserName = username
421
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
422
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
423
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
424 1
            # then the password only contains UTF-8 encoded password
425
            # and EncryptionAlgorithm is null
426
            if self.server_url.password:
427 1
                self.logger.warning("Sending plain-text password")
428
                params.UserIdentityToken.Password = password
429
            params.UserIdentityToken.EncryptionAlgorithm = ''
430
        elif self.server_url.password:
431
            data, uri = self._encrypt_password(password, policy_uri)
432 1
            params.UserIdentityToken.Password = data 
433
            params.UserIdentityToken.EncryptionAlgorithm = uri
434 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
435
436
    def _encrypt_password(self, password, policy_uri):
437
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
438
        # see specs part 4, 7.36.3: if the token is encrypted, password
439
        # shall be converted to UTF-8 and serialized with server nonce
440
        passwd = password.encode("utf8")
441
        if self._server_nonce is not None:
442
            passwd += self._server_nonce
443
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
444
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
445 1
        return data, uri
446
447
    def close_session(self):
448
        """
449 1
        Close session
450 1
        """
451 1
        if self.keepalive:
452
            self.keepalive.stop()
453 1
        return self.uaclient.close_session(True)
454 1
455
    def get_root_node(self):
456 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
457 1
458
    def get_objects_node(self):
459 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
460 1
461
    def get_server_node(self):
462 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
463
464
    def get_node(self, nodeid):
465
        """
466 1
        Get node using NodeId object or a string representing a NodeId
467
        """
468 1
        return Node(self.uaclient, nodeid)
469
470
    def create_subscription(self, period, handler):
471
        """
472
        Create a subscription.
473
        returns a Subscription object which allow
474
        to subscribe to events or data on server
475
        handler argument is a class with data_change and/or event methods.
476
        period argument is either a publishing interval in seconds or a 
477
        CreateSubscriptionParameters instance. The second option should be used,
478
        if the opcua-server has problems with the default options.
479
        These methods will be called when notfication from server are received.
480
        See example-client.py.
481
        Do not do expensive/slow or network operation from these methods
482
        since they are called directly from receiving thread. This is a design choice,
483
        start another thread if you need to do such a thing.  
484 1
        """
485
        
486 1
        if isinstance(period, ua.CreateSubscriptionParameters):
487 1
             return Subscription(self.uaclient, period, handler)
488 1
        params = ua.CreateSubscriptionParameters()
489 1
        params.RequestedPublishingInterval = period
490 1
        params.RequestedLifetimeCount = 10000
491 1
        params.RequestedMaxKeepAliveCount = 3000
492 1
        params.MaxNotificationsPerPublish = 10000
493 1
        params.PublishingEnabled = True
494
        params.Priority = 0
495 1
        return Subscription(self.uaclient, params, handler)
496 1
497 1
    def get_namespace_array(self):
498
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
499 1
        return ns_node.get_value()
500 1
501 1
    def get_namespace_index(self, uri):
502
        uries = self.get_namespace_array()
503 1
        return uries.index(uri)
504 1
505
    def delete_nodes(self, nodes, recursive=False):
506
        return delete_nodes(self.uaclient, nodes, recursive)
507
508
    def import_xml(self, path):
509
        """
510
        Import nodes defined in xml
511
        """
512
        importer = XmlImporter(self)
513
        return importer.import_xml(path)
514
515
    def export_xml(self, nodes, path):
516
        """
517
        Export defined nodes to xml
518
        """
519
        exp = XmlExporter(self)
520
        exp.build_etree(nodes)
521
        return exp.write_xml(path)
522
523
    def register_namespace(self, uri):
524
        """
525
        Register a new namespace. Nodes should in custom namespace, not 0.
526
        This method is mainly implemented for symetry with server
527
        """
528
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
529
        uries = ns_node.get_value()
530
        if uri in uries:
531
            return uries.index(uri)
532
        uries.append(uri)
533
        ns_node.set_value(uries)
534
        return len(uries) - 1
535
536
537