Completed
Pull Request — master (#388)
by Olivier
04:36
created

Client.import_structures()   B

Complexity

Conditions 5

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 23.225

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 15
ccs 1
cts 10
cp 0.1
crap 23.225
rs 8.5454
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
from opcua.common.structures_generator import StructGenerator
20 1
use_crypto = True
21 1
try:
22 1
    from opcua.crypto import uacrypto
23
except ImportError:
24
    print("cryptography is not installed, use of crypto disabled")
25
    use_crypto = False
26
27
28 1
class KeepAlive(Thread):
29
30
    """
31
    Used by Client to keep the session open.
32
    OPCUA defines timeout both for sessions and secure channel
33
    """
34
35 1
    def __init__(self, client, timeout):
36
        """
37
        :param session_timeout: Timeout to re-new the session
38
            in milliseconds.
39
        """
40 1
        Thread.__init__(self)
41 1
        self.logger = logging.getLogger(__name__)
42
43 1
        self.client = client
44 1
        self._dostop = False
45 1
        self._cond = Condition()
46 1
        self.timeout = timeout
47
48
        # some server support no timeout, but we do not trust them
49 1
        if self.timeout == 0:
50
            self.timeout = 3600000 # 1 hour
51
52 1
    def run(self):
53 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
54 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
55 1
        while not self._dostop:
56 1
            with self._cond:
57 1
                self._cond.wait(self.timeout / 1000)
58 1
            if self._dostop:
59 1
                break
60
            self.logger.debug("renewing channel")
61
            self.client.open_secure_channel(renew=True)
62
            val = server_state.get_value()
63
            self.logger.debug("server state is: %s ", val)
64 1
        self.logger.debug("keepalive thread has stopped")
65
66 1
    def stop(self):
67 1
        self.logger.debug("stoping keepalive thread")
68 1
        self._dostop = True
69 1
        with self._cond:
70 1
            self._cond.notify_all()
71
72
73 1
class Client(object):
74
75
    """
76
    High level client to connect to an OPC-UA server.
77
78
    This class makes it easy to connect and browse address space.
79
    It attemps to expose as much functionality as possible
80
    but if you want more flexibility it is possible and adviced to
81
    use UaClient object, available as self.uaclient
82
    which offers the raw OPC-UA services interface.
83
    """
84
85 1
    def __init__(self, url, timeout=4):
86
        """
87
88
        :param url: url of the server.
89
            if you are unsure of url, write at least hostname
90
            and port and call get_endpoints
91
92
        :param timeout:
93
            Each request sent to the server expects an answer within this
94
            time. The timeout is specified in seconds.
95
        """
96 1
        self.logger = logging.getLogger(__name__)
97 1
        self.server_url = urlparse(url)
98 1
        self.name = "Pure Python Client"
99 1
        self.description = self.name
100 1
        self.application_uri = "urn:freeopcua:client"
101 1
        self.product_uri = "urn:freeopcua.github.no:client"
102 1
        self.security_policy = ua.SecurityPolicy()
103 1
        self.secure_channel_id = None
104 1
        self.secure_channel_timeout = 3600000 # 1 hour
105 1
        self.session_timeout = 3600000 # 1 hour
106 1
        self._policy_ids = []
107 1
        self.uaclient = UaClient(timeout)
108 1
        self.user_certificate = None
109 1
        self.user_private_key = None
110 1
        self._session_counter = 1
111 1
        self.keepalive = None
112 1
        self.nodes = Shortcuts(self.uaclient)
113
114 1
    def __enter__(self):
115 1
        self.connect()
116 1
        return self
117
118 1
    def __exit__(self, exc_type, exc_value, traceback):
119 1
        self.disconnect()
120
121 1
    @staticmethod
122
    def find_endpoint(endpoints, security_mode, policy_uri):
123
        """
124
        Find endpoint with required security mode and policy URI
125
        """
126 1
        for ep in endpoints:
127 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
128
                    ep.SecurityMode == security_mode and
129
                    ep.SecurityPolicyUri == policy_uri):
130 1
                return ep
131 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(
132
                         security_mode, policy_uri))
133
134 1
    def set_security_string(self, string):
135
        """
136
        Set SecureConnection mode. String format:
137
        Policy,Mode,certificate,private_key[,server_private_key]
138
        where Policy is Basic128Rsa15 or Basic256,
139
            Mode is Sign or SignAndEncrypt
140
            certificate, private_key and server_private_key are
141
                paths to .pem or .der files
142
        Call this before connect()
143
        """
144 1
        if not string:
145
            return
146 1
        parts = string.split(',')
147 1
        if len(parts) < 4:
148
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
149 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
150 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
151 1
        return self.set_security(policy_class, parts[2], parts[3],
152
                                 parts[4] if len(parts) >= 5 else None, mode)
153
154 1
    def set_security(self, policy, certificate_path, private_key_path,
155
                     server_certificate_path=None,
156
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
157
        """
158
        Set SecureConnection mode.
159
        Call this before connect()
160
        """
161 1
        if server_certificate_path is None:
162
            # load certificate from server's list of endpoints
163 1
            endpoints = self.connect_and_get_server_endpoints()
164 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
165 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
166
        else:
167
            server_cert = uacrypto.load_certificate(server_certificate_path)
168 1
        cert = uacrypto.load_certificate(certificate_path)
169 1
        pk = uacrypto.load_private_key(private_key_path)
170 1
        self.security_policy = policy(server_cert, cert, pk, mode)
171 1
        self.uaclient.set_security(self.security_policy)
172
173 1
    def load_client_certificate(self, path):
174
        """
175
        load our certificate from file, either pem or der
176
        """
177
        self.user_certificate = uacrypto.load_certificate(path)
178
179 1
    def load_private_key(self, path):
180
        """
181
        Load user private key. This is used for authenticating using certificate
182
        """
183
        self.user_private_key = uacrypto.load_private_key(path)
184
185 1
    def connect_and_get_server_endpoints(self):
186
        """
187
        Connect, ask server for endpoints, and disconnect
188
        """
189 1
        self.connect_socket()
190 1
        self.send_hello()
191 1
        self.open_secure_channel()
192 1
        endpoints = self.get_endpoints()
193 1
        self.close_secure_channel()
194 1
        self.disconnect_socket()
195 1
        return endpoints
196
197 1
    def connect_and_find_servers(self):
198
        """
199
        Connect, ask server for a list of known servers, and disconnect
200
        """
201
        self.connect_socket()
202
        self.send_hello()
203
        self.open_secure_channel()  # spec says it should not be necessary to open channel
204
        servers = self.find_servers()
205
        self.close_secure_channel()
206
        self.disconnect_socket()
207
        return servers
208
209 1
    def connect_and_find_servers_on_network(self):
210
        """
211
        Connect, ask server for a list of known servers on network, and disconnect
212
        """
213
        self.connect_socket()
214
        self.send_hello()
215
        self.open_secure_channel()
216
        servers = self.find_servers_on_network()
217
        self.close_secure_channel()
218
        self.disconnect_socket()
219
        return servers
220
221 1
    def connect(self):
222
        """
223
        High level method
224
        Connect, create and activate session
225
        """
226 1
        self.connect_socket()
227 1
        self.send_hello()
228 1
        self.open_secure_channel()
229 1
        self.create_session()
230 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
231
232 1
    def disconnect(self):
233
        """
234
        High level method
235
        Close session, secure channel and socket
236
        """
237 1
        self.close_session()
238 1
        self.close_secure_channel()
239 1
        self.disconnect_socket()
240
241 1
    def connect_socket(self):
242
        """
243
        connect to socket defined in url
244
        """
245 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
246
247 1
    def disconnect_socket(self):
248 1
        self.uaclient.disconnect_socket()
249
250 1
    def send_hello(self):
251
        """
252
        Send OPC-UA hello to server
253
        """
254 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
255
        # FIXME check ack
256
257 1
    def open_secure_channel(self, renew=False):
258
        """
259
        Open secure channel, if renew is True, renew channel
260
        """
261 1
        params = ua.OpenSecureChannelParameters()
262 1
        params.ClientProtocolVersion = 0
263 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
264 1
        if renew:
265
            params.RequestType = ua.SecurityTokenRequestType.Renew
266 1
        params.SecurityMode = self.security_policy.Mode
267 1
        params.RequestedLifetime = self.secure_channel_timeout
268 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
269 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
270 1
        result = self.uaclient.open_secure_channel(params)
271 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
272 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
273
274 1
    def close_secure_channel(self):
275 1
        return self.uaclient.close_secure_channel()
276
277 1
    def get_endpoints(self):
278 1
        params = ua.GetEndpointsParameters()
279 1
        params.EndpointUrl = self.server_url.geturl()
280 1
        return self.uaclient.get_endpoints(params)
281
282 1
    def register_server(self, server, discovery_configuration=None):
283
        """
284
        register a server to discovery server
285
        if discovery_configuration is provided, the newer register_server2 service call is used
286
        """
287 1
        serv = ua.RegisteredServer()
288 1
        serv.ServerUri = server.application_uri
289 1
        serv.ProductUri = server.product_uri
290 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
291 1
        serv.ServerType = server.application_type
292 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
293 1
        serv.IsOnline = True
294 1
        if discovery_configuration:
295
            params = ua.RegisterServer2Parameters()
296
            params.Server = serv
297
            params.DiscoveryConfiguration = discovery_configuration
298
            return self.uaclient.register_server2(params)
299
        else:
300 1
            return self.uaclient.register_server(serv)
301
302 1
    def find_servers(self, uris=None):
303
        """
304
        send a FindServer request to the server. The answer should be a list of
305
        servers the server knows about
306
        A list of uris can be provided, only server having matching uris will be returned
307
        """
308 1
        if uris is None:
309 1
            uris = []
310 1
        params = ua.FindServersParameters()
311 1
        params.EndpointUrl = self.server_url.geturl()
312 1
        params.ServerUris = uris
313 1
        return self.uaclient.find_servers(params)
314
315 1
    def find_servers_on_network(self):
316
        params = ua.FindServersOnNetworkParameters()
317
        return self.uaclient.find_servers_on_network(params)
318
319 1
    def create_session(self):
320
        """
321
        send a CreateSessionRequest to server with reasonable parameters.
322
        If you want o modify settings look at code of this methods
323
        and make your own
324
        """
325 1
        desc = ua.ApplicationDescription()
326 1
        desc.ApplicationUri = self.application_uri
327 1
        desc.ProductUri = self.product_uri
328 1
        desc.ApplicationName = ua.LocalizedText(self.name)
329 1
        desc.ApplicationType = ua.ApplicationType.Client
330
331 1
        params = ua.CreateSessionParameters()
332 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
333 1
        params.ClientNonce = nonce
334 1
        params.ClientCertificate = self.security_policy.client_certificate
335 1
        params.ClientDescription = desc
336 1
        params.EndpointUrl = self.server_url.geturl()
337 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
338 1
        params.RequestedSessionTimeout = 3600000
339 1
        params.MaxResponseMessageSize = 0  # means no max size
340 1
        response = self.uaclient.create_session(params)
341 1
        if self.security_policy.client_certificate is None:
342 1
            data = nonce
343
        else:
344 1
            data = self.security_policy.client_certificate + nonce
345 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
346 1
        self._server_nonce = response.ServerNonce
347 1
        if not self.security_policy.server_certificate:
348 1
            self.security_policy.server_certificate = response.ServerCertificate
349 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
350
            raise ua.UaError("Server certificate mismatch")
351
        # remember PolicyId's: we will use them in activate_session()
352 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
353 1
        self._policy_ids = ep.UserIdentityTokens
354 1
        self.session_timeout = response.RevisedSessionTimeout
355 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
356 1
        self.keepalive.start()
357 1
        return response
358
359 1
    def server_policy_id(self, token_type, default):
360
        """
361
        Find PolicyId of server's UserTokenPolicy by token_type.
362
        Return default if there's no matching UserTokenPolicy.
363
        """
364 1
        for policy in self._policy_ids:
365 1
            if policy.TokenType == token_type:
366 1
                return policy.PolicyId
367
        return default
368
369 1
    def server_policy_uri(self, token_type):
370
        """
371
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
372
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
373
        of the endpoint
374
        """
375 1
        for policy in self._policy_ids:
376 1
            if policy.TokenType == token_type:
377 1
                if policy.SecurityPolicyUri:
378
                    return policy.SecurityPolicyUri
379
                else:   # empty URI means "use this endpoint's policy URI"
380 1
                    return self.security_policy.URI
381
        return self.security_policy.URI
382
383 1
    def activate_session(self, username=None, password=None, certificate=None):
384
        """
385
        Activate session using either username and password or private_key
386
        """
387 1
        params = ua.ActivateSessionParameters()
388 1
        challenge = b""
389 1
        if self.security_policy.server_certificate is not None:
390 1
            challenge += self.security_policy.server_certificate
391 1
        if self._server_nonce is not None:
392 1
            challenge += self._server_nonce
393 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
394 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
395 1
        params.LocaleIds.append("en")
396 1
        if not username and not certificate:
397 1
            self._add_anonymous_auth(params)
398 1
        elif certificate:
399
            self._add_certificate_auth(params, certificate, challenge)
400
        else:
401 1
            self._add_user_auth(params, username, password)
402 1
        return self.uaclient.activate_session(params)
403
404 1
    def _add_anonymous_auth(self, params):
405 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
406 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
407
408 1
    def _add_certificate_auth(self, params, certificate, challenge):
409
        params.UserIdentityToken = ua.X509IdentityToken()
410
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
411
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
412
        # specs part 4, 5.6.3.1: the data to sign is created by appending
413
        # the last serverNonce to the serverCertificate
414
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
415
        params.UserTokenSignature = ua.SignatureData()
416
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
417
        params.UserTokenSignature.Signature = sig
418
419 1
    def _add_user_auth(self, params, username, password):
420 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
421 1
        params.UserIdentityToken.UserName = username
422 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
423 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
424
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
425
            # then the password only contains UTF-8 encoded password
426
            # and EncryptionAlgorithm is null
427 1
            if self.server_url.password:
428
                self.logger.warning("Sending plain-text password")
429
                params.UserIdentityToken.Password = password
430 1
            params.UserIdentityToken.EncryptionAlgorithm = ''
431
        elif self.server_url.password:
432
            data, uri = self._encrypt_password(password, policy_uri)
433
            params.UserIdentityToken.Password = data 
434
            params.UserIdentityToken.EncryptionAlgorithm = uri
435 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
436
437 1
    def _encrypt_password(self, password, policy_uri):
438
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
439
        # see specs part 4, 7.36.3: if the token is encrypted, password
440
        # shall be converted to UTF-8 and serialized with server nonce
441
        passwd = password.encode("utf8")
442
        if self._server_nonce is not None:
443
            passwd += self._server_nonce
444
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
445
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
446
        return data, uri
447
448 1
    def close_session(self):
449
        """
450
        Close session
451
        """
452 1
        if self.keepalive:
453 1
            self.keepalive.stop()
454 1
        return self.uaclient.close_session(True)
455
456 1
    def get_root_node(self):
457 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
458
459 1
    def get_objects_node(self):
460 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
461
462 1
    def get_server_node(self):
463 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
464
465 1
    def get_node(self, nodeid):
466
        """
467
        Get node using NodeId object or a string representing a NodeId
468
        """
469 1
        return Node(self.uaclient, nodeid)
470
471 1
    def create_subscription(self, period, handler):
472
        """
473
        Create a subscription.
474
        returns a Subscription object which allow
475
        to subscribe to events or data on server
476
        handler argument is a class with data_change and/or event methods.
477
        period argument is either a publishing interval in seconds or a 
478
        CreateSubscriptionParameters instance. The second option should be used,
479
        if the opcua-server has problems with the default options.
480
        These methods will be called when notfication from server are received.
481
        See example-client.py.
482
        Do not do expensive/slow or network operation from these methods
483
        since they are called directly from receiving thread. This is a design choice,
484
        start another thread if you need to do such a thing.  
485
        """
486
        
487 1
        if isinstance(period, ua.CreateSubscriptionParameters):
488
             return Subscription(self.uaclient, period, handler)
489 1
        params = ua.CreateSubscriptionParameters()
490 1
        params.RequestedPublishingInterval = period
491 1
        params.RequestedLifetimeCount = 10000
492 1
        params.RequestedMaxKeepAliveCount = 3000
493 1
        params.MaxNotificationsPerPublish = 10000
494 1
        params.PublishingEnabled = True
495 1
        params.Priority = 0
496 1
        return Subscription(self.uaclient, params, handler)
497
498 1
    def get_namespace_array(self):
499 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
500 1
        return ns_node.get_value()
501
502 1
    def get_namespace_index(self, uri):
503 1
        uries = self.get_namespace_array()
504 1
        return uries.index(uri)
505
506 1
    def delete_nodes(self, nodes, recursive=False):
507 1
        return delete_nodes(self.uaclient, nodes, recursive)
508
509 1
    def import_xml(self, path):
510
        """
511
        Import nodes defined in xml
512
        """
513 1
        importer = XmlImporter(self)
514 1
        return importer.import_xml(path)
515
516 1
    def export_xml(self, nodes, path):
517
        """
518
        Export defined nodes to xml
519
        """
520 1
        exp = XmlExporter(self)
521 1
        exp.build_etree(nodes)
522 1
        return exp.write_xml(path)
523
524 1
    def register_namespace(self, uri):
525
        """
526
        Register a new namespace. Nodes should in custom namespace, not 0.
527
        This method is mainly implemented for symetry with server
528
        """
529 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
530 1
        uries = ns_node.get_value()
531 1
        if uri in uries:
532 1
            return uries.index(uri)
533 1
        uries.append(uri)
534 1
        ns_node.set_value(uries)
535 1
        return len(uries) - 1
536
537 1
    def import_structures(self, nodes=None):
538
        """
539
        Download xml from given variable node defining custom structures.
540
        If no node is given, attemps to import variables from
541
        """
542
        if not nodes:
543
            nodes = []
544
            opc_bin = self.nodes.base_data_type.get_child("0:OPC Binary")
545
            for desc in opc_bin.get_children_descriptions():
546
                if desc.BrowseName != ua.QualifiedName("opc.Ua"):
547
                    nodes.append(self.get_node(desc.NodeId))
548
549
        for node in nodes:
550
            xml = node.get_value()
551
            gen = StructGenerator(xml, name)
552
553
554
            
555