Completed
Push — master ( 4eddf2...83c23e )
by Olivier
04:06
created

Client.import_xml()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 6
ccs 3
cts 3
cp 1
crap 1
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
use_crypto = True
20 1
try:
21 1
    from opcua.crypto import uacrypto
22
except ImportError:
23
    print("cryptography is not installed, use of crypto disabled")
24
    use_crypto = False
25
26
27 1
class KeepAlive(Thread):
28
29
    """
30
    Used by Client to keep the session open.
31
    OPCUA defines timeout both for sessions and secure channel
32
    """
33
34 1
    def __init__(self, client, timeout):
35
        """
36
        :param session_timeout: Timeout to re-new the session
37
            in milliseconds.
38
        """
39 1
        Thread.__init__(self)
40 1
        self.logger = logging.getLogger(__name__)
41
42 1
        self.client = client
43 1
        self._dostop = False
44 1
        self._cond = Condition()
45 1
        self.timeout = timeout
46
47
        # some server support no timeout, but we do not trust them
48 1
        if self.timeout == 0:
49
            self.timeout = 3600000 # 1 hour
50
51 1
    def run(self):
52 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
53 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
54 1
        while not self._dostop:
55 1
            with self._cond:
56 1
                self._cond.wait(self.timeout / 1000)
57 1
            if self._dostop:
58 1
                break
59
            self.logger.debug("renewing channel")
60
            self.client.open_secure_channel(renew=True)
61
            val = server_state.get_value()
62
            self.logger.debug("server state is: %s ", val)
63 1
        self.logger.debug("keepalive thread has stopped")
64
65 1
    def stop(self):
66 1
        self.logger.debug("stoping keepalive thread")
67 1
        self._dostop = True
68 1
        with self._cond:
69 1
            self._cond.notify_all()
70
71
72 1
class Client(object):
73
74
    """
75
    High level client to connect to an OPC-UA server.
76
77
    This class makes it easy to connect and browse address space.
78
    It attemps to expose as much functionality as possible
79
    but if you want more flexibility it is possible and adviced to
80
    use UaClient object, available as self.uaclient
81
    which offers the raw OPC-UA services interface.
82
    """
83
84 1
    def __init__(self, url, timeout=4):
85
        """
86
87
        :param url: url of the server.
88
            if you are unsure of url, write at least hostname
89
            and port and call get_endpoints
90
91
        :param timeout:
92
            Each request sent to the server expects an answer within this
93
            time. The timeout is specified in seconds.
94
        """
95 1
        self.logger = logging.getLogger(__name__)
96 1
        self.server_url = urlparse(url)
97 1
        self.name = "Pure Python Client"
98 1
        self.description = self.name
99 1
        self.application_uri = "urn:freeopcua:client"
100 1
        self.product_uri = "urn:freeopcua.github.no:client"
101 1
        self.security_policy = ua.SecurityPolicy()
102 1
        self.secure_channel_id = None
103 1
        self.secure_channel_timeout = 3600000 # 1 hour
104 1
        self.session_timeout = 3600000 # 1 hour
105 1
        self._policy_ids = []
106 1
        self.uaclient = UaClient(timeout)
107 1
        self.user_certificate = None
108 1
        self.user_private_key = None
109 1
        self._session_counter = 1
110 1
        self.keepalive = None
111 1
        self.nodes = Shortcuts(self.uaclient)
112
113 1
    def __enter__(self):
114 1
        self.connect()
115 1
        return self
116
117 1
    def __exit__(self, exc_type, exc_value, traceback):
118 1
        self.disconnect()
119
120 1
    @staticmethod
121
    def find_endpoint(endpoints, security_mode, policy_uri):
122
        """
123
        Find endpoint with required security mode and policy URI
124
        """
125 1
        for ep in endpoints:
126 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
127
                    ep.SecurityMode == security_mode and
128
                    ep.SecurityPolicyUri == policy_uri):
129 1
                return ep
130 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(
131
                         security_mode, policy_uri))
132
133 1
    def set_security_string(self, string):
134
        """
135
        Set SecureConnection mode. String format:
136
        Policy,Mode,certificate,private_key[,server_private_key]
137
        where Policy is Basic128Rsa15 or Basic256,
138
            Mode is Sign or SignAndEncrypt
139
            certificate, private_key and server_private_key are
140
                paths to .pem or .der files
141
        Call this before connect()
142
        """
143 1
        if not string:
144
            return
145 1
        parts = string.split(',')
146 1
        if len(parts) < 4:
147
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
148 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
149 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
150 1
        return self.set_security(policy_class, parts[2], parts[3],
151
                                 parts[4] if len(parts) >= 5 else None, mode)
152
153 1
    def set_security(self, policy, certificate_path, private_key_path,
154
                     server_certificate_path=None,
155
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
156
        """
157
        Set SecureConnection mode.
158
        Call this before connect()
159
        """
160 1
        if server_certificate_path is None:
161
            # load certificate from server's list of endpoints
162 1
            endpoints = self.connect_and_get_server_endpoints()
163 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
164 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
165
        else:
166
            server_cert = uacrypto.load_certificate(server_certificate_path)
167 1
        cert = uacrypto.load_certificate(certificate_path)
168 1
        pk = uacrypto.load_private_key(private_key_path)
169 1
        self.security_policy = policy(server_cert, cert, pk, mode)
170 1
        self.uaclient.set_security(self.security_policy)
171
172 1
    def load_client_certificate(self, path):
173
        """
174
        load our certificate from file, either pem or der
175
        """
176
        self.user_certificate = uacrypto.load_certificate(path)
177
178 1
    def load_private_key(self, path):
179
        """
180
        Load user private key. This is used for authenticating using certificate
181
        """
182
        self.user_private_key = uacrypto.load_private_key(path)
183
184 1
    def connect_and_get_server_endpoints(self):
185
        """
186
        Connect, ask server for endpoints, and disconnect
187
        """
188 1
        self.connect_socket()
189 1
        self.send_hello()
190 1
        self.open_secure_channel()
191 1
        endpoints = self.get_endpoints()
192 1
        self.close_secure_channel()
193 1
        self.disconnect_socket()
194 1
        return endpoints
195
196 1
    def connect_and_find_servers(self):
197
        """
198
        Connect, ask server for a list of known servers, and disconnect
199
        """
200
        self.connect_socket()
201
        self.send_hello()
202
        self.open_secure_channel()  # spec says it should not be necessary to open channel
203
        servers = self.find_servers()
204
        self.close_secure_channel()
205
        self.disconnect_socket()
206
        return servers
207
208 1
    def connect_and_find_servers_on_network(self):
209
        """
210
        Connect, ask server for a list of known servers on network, and disconnect
211
        """
212
        self.connect_socket()
213
        self.send_hello()
214
        self.open_secure_channel()
215
        servers = self.find_servers_on_network()
216
        self.close_secure_channel()
217
        self.disconnect_socket()
218
        return servers
219
220 1
    def connect(self):
221
        """
222
        High level method
223
        Connect, create and activate session
224
        """
225 1
        self.connect_socket()
226 1
        self.send_hello()
227 1
        self.open_secure_channel()
228 1
        self.create_session()
229 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
230
231 1
    def disconnect(self):
232
        """
233
        High level method
234
        Close session, secure channel and socket
235
        """
236 1
        self.close_session()
237 1
        self.close_secure_channel()
238 1
        self.disconnect_socket()
239
240 1
    def connect_socket(self):
241
        """
242
        connect to socket defined in url
243
        """
244 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
245
246 1
    def disconnect_socket(self):
247 1
        self.uaclient.disconnect_socket()
248
249 1
    def send_hello(self):
250
        """
251
        Send OPC-UA hello to server
252
        """
253 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
254
        # FIXME check ack
255
256 1
    def open_secure_channel(self, renew=False):
257
        """
258
        Open secure channel, if renew is True, renew channel
259
        """
260 1
        params = ua.OpenSecureChannelParameters()
261 1
        params.ClientProtocolVersion = 0
262 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
263 1
        if renew:
264
            params.RequestType = ua.SecurityTokenRequestType.Renew
265 1
        params.SecurityMode = self.security_policy.Mode
266 1
        params.RequestedLifetime = self.secure_channel_timeout
267 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
268 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
269 1
        result = self.uaclient.open_secure_channel(params)
270 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
271 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
272
273 1
    def close_secure_channel(self):
274 1
        return self.uaclient.close_secure_channel()
275
276 1
    def get_endpoints(self):
277 1
        params = ua.GetEndpointsParameters()
278 1
        params.EndpointUrl = self.server_url.geturl()
279 1
        return self.uaclient.get_endpoints(params)
280
281 1
    def register_server(self, server, discovery_configuration=None):
282
        """
283
        register a server to discovery server
284
        if discovery_configuration is provided, the newer register_server2 service call is used
285
        """
286 1
        serv = ua.RegisteredServer()
287 1
        serv.ServerUri = server.application_uri
288 1
        serv.ProductUri = server.product_uri
289 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
290 1
        serv.ServerType = server.application_type
291 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
292 1
        serv.IsOnline = True
293 1
        if discovery_configuration:
294
            params = ua.RegisterServer2Parameters()
295
            params.Server = serv
296
            params.DiscoveryConfiguration = discovery_configuration
297
            return self.uaclient.register_server2(params)
298
        else:
299 1
            return self.uaclient.register_server(serv)
300
301 1
    def find_servers(self, uris=None):
302
        """
303
        send a FindServer request to the server. The answer should be a list of
304
        servers the server knows about
305
        A list of uris can be provided, only server having matching uris will be returned
306
        """
307 1
        if uris is None:
308 1
            uris = []
309 1
        params = ua.FindServersParameters()
310 1
        params.EndpointUrl = self.server_url.geturl()
311 1
        params.ServerUris = uris
312 1
        return self.uaclient.find_servers(params)
313
314 1
    def find_servers_on_network(self):
315
        params = ua.FindServersOnNetworkParameters()
316
        return self.uaclient.find_servers_on_network(params)
317
318 1
    def create_session(self):
319
        """
320
        send a CreateSessionRequest to server with reasonable parameters.
321
        If you want o modify settings look at code of this methods
322
        and make your own
323
        """
324 1
        desc = ua.ApplicationDescription()
325 1
        desc.ApplicationUri = self.application_uri
326 1
        desc.ProductUri = self.product_uri
327 1
        desc.ApplicationName = ua.LocalizedText(self.name)
328 1
        desc.ApplicationType = ua.ApplicationType.Client
329
330 1
        params = ua.CreateSessionParameters()
331 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
332 1
        params.ClientNonce = nonce
333 1
        params.ClientCertificate = self.security_policy.client_certificate
334 1
        params.ClientDescription = desc
335 1
        params.EndpointUrl = self.server_url.geturl()
336 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
337 1
        params.RequestedSessionTimeout = 3600000
338 1
        params.MaxResponseMessageSize = 0  # means no max size
339 1
        response = self.uaclient.create_session(params)
340 1
        if self.security_policy.client_certificate is None:
341 1
            data = nonce
342
        else:
343 1
            data = self.security_policy.client_certificate + nonce
344 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
345 1
        self._server_nonce = response.ServerNonce
346 1
        if not self.security_policy.server_certificate:
347 1
            self.security_policy.server_certificate = response.ServerCertificate
348 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
349
            raise ua.UaError("Server certificate mismatch")
350
        # remember PolicyId's: we will use them in activate_session()
351 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
352 1
        self._policy_ids = ep.UserIdentityTokens
353 1
        self.session_timeout = response.RevisedSessionTimeout
354 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
355 1
        self.keepalive.start()
356 1
        return response
357
358 1
    def server_policy_id(self, token_type, default):
359
        """
360
        Find PolicyId of server's UserTokenPolicy by token_type.
361
        Return default if there's no matching UserTokenPolicy.
362
        """
363 1
        for policy in self._policy_ids:
364 1
            if policy.TokenType == token_type:
365 1
                return policy.PolicyId
366
        return default
367
368 1
    def server_policy_uri(self, token_type):
369
        """
370
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
371
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
372
        of the endpoint
373
        """
374 1
        for policy in self._policy_ids:
375 1
            if policy.TokenType == token_type:
376 1
                if policy.SecurityPolicyUri:
377
                    return policy.SecurityPolicyUri
378
                else:   # empty URI means "use this endpoint's policy URI"
379 1
                    return self.security_policy.URI
380
        return self.security_policy.URI
381
382 1
    def activate_session(self, username=None, password=None, certificate=None):
383
        """
384
        Activate session using either username and password or private_key
385
        """
386 1
        params = ua.ActivateSessionParameters()
387 1
        challenge = b""
388 1
        if self.security_policy.server_certificate is not None:
389 1
            challenge += self.security_policy.server_certificate
390 1
        if self._server_nonce is not None:
391 1
            challenge += self._server_nonce
392 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
393 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
394 1
        params.LocaleIds.append("en")
395 1
        if not username and not certificate:
396 1
            self._add_anonymous_auth(params)
397 1
        elif certificate:
398
            self._add_certificate_auth(params, certificate, challenge)
399
        else:
400 1
            self._add_user_auth(params, username, password)
401 1
        return self.uaclient.activate_session(params)
402
403 1
    def _add_anonymous_auth(self, params):
404 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
405 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
406
407 1
    def _add_certificate_auth(self, params, certificate, challenge):
408
        params.UserIdentityToken = ua.X509IdentityToken()
409
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
410
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
411
        # specs part 4, 5.6.3.1: the data to sign is created by appending
412
        # the last serverNonce to the serverCertificate
413
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
414
        params.UserTokenSignature = ua.SignatureData()
415
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
416
        params.UserTokenSignature.Signature = sig
417
418 1
    def _add_user_auth(self, params, username, password):
419 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
420 1
        params.UserIdentityToken.UserName = username
421 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
422 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
423
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
424
            # then the password only contains UTF-8 encoded password
425
            # and EncryptionAlgorithm is null
426 1
            if self.server_url.password:
427
                self.logger.warning("Sending plain-text password")
428
                params.UserIdentityToken.Password = password
429 1
            params.UserIdentityToken.EncryptionAlgorithm = ''
430
        elif self.server_url.password:
431
            data, uri = self._encrypt_password(password, policy_uri)
432
            params.UserIdentityToken.Password = data 
433
            params.UserIdentityToken.EncryptionAlgorithm = uri
434 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
435
436 1
    def _encrypt_password(self, password, policy_uri):
437
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
438
        # see specs part 4, 7.36.3: if the token is encrypted, password
439
        # shall be converted to UTF-8 and serialized with server nonce
440
        passwd = password.encode("utf8")
441
        if self._server_nonce is not None:
442
            passwd += self._server_nonce
443
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
444
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
445
        return data, uri
446
447 1
    def close_session(self):
448
        """
449
        Close session
450
        """
451 1
        if self.keepalive:
452 1
            self.keepalive.stop()
453 1
        return self.uaclient.close_session(True)
454
455 1
    def get_root_node(self):
456 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
457
458 1
    def get_objects_node(self):
459 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
460
461 1
    def get_server_node(self):
462 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
463
464 1
    def get_node(self, nodeid):
465
        """
466
        Get node using NodeId object or a string representing a NodeId
467
        """
468 1
        return Node(self.uaclient, nodeid)
469
470 1
    def create_subscription(self, period, handler):
471
        """
472
        Create a subscription.
473
        returns a Subscription object which allow
474
        to subscribe to events or data on server
475
        handler argument is a class with data_change and/or event methods.
476
        period argument is either a publishing interval in seconds or a 
477
        CreateSubscriptionParameters instance. The second option should be used,
478
        if the opcua-server has problems with the default options.
479
        These methods will be called when notfication from server are received.
480
        See example-client.py.
481
        Do not do expensive/slow or network operation from these methods
482
        since they are called directly from receiving thread. This is a design choice,
483
        start another thread if you need to do such a thing.  
484
        """
485
        
486 1
        if isinstance(period, ua.CreateSubscriptionParameters):
487
             return Subscription(self.uaclient, period, handler)
488 1
        params = ua.CreateSubscriptionParameters()
489 1
        params.RequestedPublishingInterval = period
490 1
        params.RequestedLifetimeCount = 10000
491 1
        params.RequestedMaxKeepAliveCount = 3000
492 1
        params.MaxNotificationsPerPublish = 10000
493 1
        params.PublishingEnabled = True
494 1
        params.Priority = 0
495 1
        return Subscription(self.uaclient, params, handler)
496
497 1
    def get_namespace_array(self):
498 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
499 1
        return ns_node.get_value()
500
501 1
    def get_namespace_index(self, uri):
502 1
        uries = self.get_namespace_array()
503 1
        return uries.index(uri)
504
505 1
    def delete_nodes(self, nodes, recursive=False):
506 1
        return delete_nodes(self.uaclient, nodes, recursive)
507
508 1
    def import_xml(self, path):
509
        """
510
        Import nodes defined in xml
511
        """
512 1
        importer = XmlImporter(self)
513 1
        return importer.import_xml(path)
514
515 1
    def export_xml(self, nodes, path):
516
        """
517
        Export defined nodes to xml
518
        """
519 1
        exp = XmlExporter(self)
520 1
        exp.build_etree(nodes)
521 1
        return exp.write_xml(path)
522
523 1
    def register_namespace(self, uri):
524
        """
525
        Register a new namespace. Nodes should in custom namespace, not 0.
526
        This method is mainly implemented for symetry with server
527
        """
528 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
529 1
        uries = ns_node.get_value()
530 1
        if uri in uries:
531 1
            return uries.index(uri)
532 1
        uries.append(uri)
533 1
        ns_node.set_value(uries)
534 1
        return len(uries) - 1
535
536
537