Passed
Push — master ( 9e8dfd...0c7678 )
by Olivier
04:43
created

Client.load_enums()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 6
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
from opcua.common.structures import load_type_definitions, load_enums
20 1
use_crypto = True
21 1
try:
22 1
    from opcua.crypto import uacrypto
23
except ImportError:
24
    logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
25
    use_crypto = False
26
27
28 1
class KeepAlive(Thread):
29
30
    """
31
    Used by Client to keep the session open.
32
    OPCUA defines timeout both for sessions and secure channel
33
    """
34
35 1
    def __init__(self, client, timeout):
36
        """
37
        :param session_timeout: Timeout to re-new the session
38
            in milliseconds.
39
        """
40 1
        Thread.__init__(self)
41 1
        self.logger = logging.getLogger(__name__)
42
43 1
        self.client = client
44 1
        self._dostop = False
45 1
        self._cond = Condition()
46 1
        self.timeout = timeout
47
48
        # some server support no timeout, but we do not trust them
49 1
        if self.timeout == 0:
50
            self.timeout = 3600000  # 1 hour
51
52 1
    def run(self):
53 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
54 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
55 1
        while not self._dostop:
56 1
            with self._cond:
57 1
                self._cond.wait(self.timeout / 1000)
58 1
            if self._dostop:
59 1
                break
60
            self.logger.debug("renewing channel")
61
            self.client.open_secure_channel(renew=True)
62
            val = server_state.get_value()
63
            self.logger.debug("server state is: %s ", val)
64 1
        self.logger.debug("keepalive thread has stopped")
65
66 1
    def stop(self):
67 1
        self.logger.debug("stoping keepalive thread")
68 1
        self._dostop = True
69 1
        with self._cond:
70 1
            self._cond.notify_all()
71
72
73 1
class Client(object):
74
75
    """
76
    High level client to connect to an OPC-UA server.
77
78
    This class makes it easy to connect and browse address space.
79
    It attemps to expose as much functionality as possible
80
    but if you want more flexibility it is possible and adviced to
81
    use UaClient object, available as self.uaclient
82
    which offers the raw OPC-UA services interface.
83
    """
84
85 1
    def __init__(self, url, timeout=4):
86
        """
87
88
        :param url: url of the server.
89
            if you are unsure of url, write at least hostname
90
            and port and call get_endpoints
91
92
        :param timeout:
93
            Each request sent to the server expects an answer within this
94
            time. The timeout is specified in seconds.
95
        """
96 1
        self.logger = logging.getLogger(__name__)
97 1
        self.server_url = urlparse(url)
98
        # take initial username and password from the url
99 1
        self._username = self.server_url.username
100 1
        self._password = self.server_url.password
101 1
        self.name = "Pure Python Client"
102 1
        self.description = self.name
103 1
        self.application_uri = "urn:freeopcua:client"
104 1
        self.product_uri = "urn:freeopcua.github.io:client"
105 1
        self.security_policy = ua.SecurityPolicy()
106 1
        self.secure_channel_id = None
107 1
        self.secure_channel_timeout = 3600000  # 1 hour
108 1
        self.session_timeout = 3600000  # 1 hour
109 1
        self._policy_ids = []
110 1
        self.uaclient = UaClient(timeout)
111 1
        self.user_certificate = None
112 1
        self.user_private_key = None
113 1
        self._server_nonce = None
114 1
        self._session_counter = 1
115 1
        self.keepalive = None
116 1
        self.nodes = Shortcuts(self.uaclient)
117 1
        self.max_messagesize = 0  # No limits
118 1
        self.max_chunkcount = 0  # No limits
119
120 1
    def __enter__(self):
121 1
        self.connect()
122 1
        return self
123
124 1
    def __exit__(self, exc_type, exc_value, traceback):
125 1
        self.disconnect()
126
127 1
    @staticmethod
128
    def find_endpoint(endpoints, security_mode, policy_uri):
129
        """
130
        Find endpoint with required security mode and policy URI
131
        """
132 1
        for ep in endpoints:
133 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
134
                    ep.SecurityMode == security_mode and
135
                    ep.SecurityPolicyUri == policy_uri):
136 1
                return ep
137 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
138
139 1
    def set_user(self, username):
140
        """
141
        Set user name for the connection.
142
        initial user from the URL will be overwritten
143
        """
144
        self._username = username
145
146 1
    def set_password(self, pwd):
147
        """
148
        Set user password for the connection.
149
        initial password from the URL will be overwritten
150
        """
151
        self._password = pwd
152
153 1
    def set_security_string(self, string):
154
        """
155
        Set SecureConnection mode. String format:
156
        Policy,Mode,certificate,private_key[,server_private_key]
157
        where Policy is Basic128Rsa15 or Basic256,
158
            Mode is Sign or SignAndEncrypt
159
            certificate, private_key and server_private_key are
160
                paths to .pem or .der files
161
        Call this before connect()
162
        """
163 1
        if not string:
164
            return
165 1
        parts = string.split(',')
166 1
        if len(parts) < 4:
167
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
168 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
169 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
170 1
        return self.set_security(policy_class, parts[2], parts[3],
171
                                 parts[4] if len(parts) >= 5 else None, mode)
172
173 1
    def set_security(self, policy, certificate_path, private_key_path,
174
                     server_certificate_path=None,
175
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
176
        """
177
        Set SecureConnection mode.
178
        Call this before connect()
179
        """
180 1
        if server_certificate_path is None:
181
            # load certificate from server's list of endpoints
182 1
            endpoints = self.connect_and_get_server_endpoints()
183 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
184 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
185
        else:
186
            server_cert = uacrypto.load_certificate(server_certificate_path)
187 1
        cert = uacrypto.load_certificate(certificate_path)
188 1
        pk = uacrypto.load_private_key(private_key_path)
189 1
        self.security_policy = policy(server_cert, cert, pk, mode)
190 1
        self.uaclient.set_security(self.security_policy)
191
192 1
    def load_client_certificate(self, path):
193
        """
194
        load our certificate from file, either pem or der
195
        """
196
        self.user_certificate = uacrypto.load_certificate(path)
197
198 1
    def load_private_key(self, path):
199
        """
200
        Load user private key. This is used for authenticating using certificate
201
        """
202
        self.user_private_key = uacrypto.load_private_key(path)
203
204 1
    def connect_and_get_server_endpoints(self):
205
        """
206
        Connect, ask server for endpoints, and disconnect
207
        """
208 1
        self.connect_socket()
209 1
        try:
210 1
            self.send_hello()
211 1
            self.open_secure_channel()
212 1
            endpoints = self.get_endpoints()
213 1
            self.close_secure_channel()
214
        finally:
215 1
            self.disconnect_socket()
216 1
        return endpoints
217
218 1
    def connect_and_find_servers(self):
219
        """
220
        Connect, ask server for a list of known servers, and disconnect
221
        """
222
        self.connect_socket()
223
        try:
224
            self.send_hello()
225
            self.open_secure_channel()  # spec says it should not be necessary to open channel
226
            servers = self.find_servers()
227
            self.close_secure_channel()
228
        finally:
229
            self.disconnect_socket()
230
        return servers
231
232 1
    def connect_and_find_servers_on_network(self):
233
        """
234
        Connect, ask server for a list of known servers on network, and disconnect
235
        """
236
        self.connect_socket()
237
        try:
238
            self.send_hello()
239
            self.open_secure_channel()
240
            servers = self.find_servers_on_network()
241
            self.close_secure_channel()
242
        finally:
243
            self.disconnect_socket()
244
        return servers
245
246 1
    def connect(self):
247
        """
248
        High level method
249
        Connect, create and activate session
250
        """
251 1
        self.connect_socket()
252 1
        try:
253 1
            self.send_hello()
254 1
            self.open_secure_channel()
255 1
            self.create_session()
256
        except Exception:
257
            self.disconnect_socket()  # clean up open socket
258
            raise
259 1
        self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
260
261 1
    def disconnect(self):
262
        """
263
        High level method
264
        Close session, secure channel and socket
265
        """
266 1
        try:
267 1
            self.close_session()
268 1
            self.close_secure_channel()
269
        finally:
270 1
            self.disconnect_socket()
271
272 1
    def connect_socket(self):
273
        """
274
        connect to socket defined in url
275
        """
276 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
277
278 1
    def disconnect_socket(self):
279 1
        self.uaclient.disconnect_socket()
280
281 1
    def send_hello(self):
282
        """
283
        Send OPC-UA hello to server
284
        """
285 1
        ack = self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
286
        # FIXME check ack
287
288 1
    def open_secure_channel(self, renew=False):
289
        """
290
        Open secure channel, if renew is True, renew channel
291
        """
292 1
        params = ua.OpenSecureChannelParameters()
293 1
        params.ClientProtocolVersion = 0
294 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
295 1
        if renew:
296
            params.RequestType = ua.SecurityTokenRequestType.Renew
297 1
        params.SecurityMode = self.security_policy.Mode
298 1
        params.RequestedLifetime = self.secure_channel_timeout
299
        # length should be equal to the length of key of symmetric encryption
300 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)
301 1
        params.ClientNonce = nonce  # this nonce is used to create a symmetric key
302 1
        result = self.uaclient.open_secure_channel(params)
303 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
304 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
305
306 1
    def close_secure_channel(self):
307 1
        return self.uaclient.close_secure_channel()
308
309 1
    def get_endpoints(self):
310 1
        params = ua.GetEndpointsParameters()
311 1
        params.EndpointUrl = self.server_url.geturl()
312 1
        return self.uaclient.get_endpoints(params)
313
314 1
    def register_server(self, server, discovery_configuration=None):
315
        """
316
        register a server to discovery server
317
        if discovery_configuration is provided, the newer register_server2 service call is used
318
        """
319 1
        serv = ua.RegisteredServer()
320 1
        serv.ServerUri = server.application_uri
321 1
        serv.ProductUri = server.product_uri
322 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
323 1
        serv.ServerType = server.application_type
324 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
325 1
        serv.IsOnline = True
326 1
        if discovery_configuration:
327
            params = ua.RegisterServer2Parameters()
328
            params.Server = serv
329
            params.DiscoveryConfiguration = discovery_configuration
330
            return self.uaclient.register_server2(params)
331
        else:
332 1
            return self.uaclient.register_server(serv)
333
334 1
    def find_servers(self, uris=None):
335
        """
336
        send a FindServer request to the server. The answer should be a list of
337
        servers the server knows about
338
        A list of uris can be provided, only server having matching uris will be returned
339
        """
340 1
        if uris is None:
341 1
            uris = []
342 1
        params = ua.FindServersParameters()
343 1
        params.EndpointUrl = self.server_url.geturl()
344 1
        params.ServerUris = uris
345 1
        return self.uaclient.find_servers(params)
346
347 1
    def find_servers_on_network(self):
348
        params = ua.FindServersOnNetworkParameters()
349
        return self.uaclient.find_servers_on_network(params)
350
351 1
    def create_session(self):
352
        """
353
        send a CreateSessionRequest to server with reasonable parameters.
354
        If you want o modify settings look at code of this methods
355
        and make your own
356
        """
357 1
        desc = ua.ApplicationDescription()
358 1
        desc.ApplicationUri = self.application_uri
359 1
        desc.ProductUri = self.product_uri
360 1
        desc.ApplicationName = ua.LocalizedText(self.name)
361 1
        desc.ApplicationType = ua.ApplicationType.Client
362
363 1
        params = ua.CreateSessionParameters()
364
        # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
365 1
        nonce = utils.create_nonce(32)
366 1
        params.ClientNonce = nonce
367 1
        params.ClientCertificate = self.security_policy.client_certificate
368 1
        params.ClientDescription = desc
369 1
        params.EndpointUrl = self.server_url.geturl()
370 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
371 1
        params.RequestedSessionTimeout = 3600000
372 1
        params.MaxResponseMessageSize = 0  # means no max size
373 1
        response = self.uaclient.create_session(params)
374 1
        if self.security_policy.client_certificate is None:
375 1
            data = nonce
376
        else:
377 1
            data = self.security_policy.client_certificate + nonce
378 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
379 1
        self._server_nonce = response.ServerNonce
380 1
        if not self.security_policy.server_certificate:
381 1
            self.security_policy.server_certificate = response.ServerCertificate
382 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
383
            raise ua.UaError("Server certificate mismatch")
384
        # remember PolicyId's: we will use them in activate_session()
385 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
386 1
        self._policy_ids = ep.UserIdentityTokens
387 1
        self.session_timeout = response.RevisedSessionTimeout
388 1
        self.keepalive = KeepAlive(
389
            self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
390 1
        self.keepalive.start()
391 1
        return response
392
393 1
    def server_policy_id(self, token_type, default):
394
        """
395
        Find PolicyId of server's UserTokenPolicy by token_type.
396
        Return default if there's no matching UserTokenPolicy.
397
        """
398 1
        for policy in self._policy_ids:
399 1
            if policy.TokenType == token_type:
400 1
                return policy.PolicyId
401
        return default
402
403 1
    def server_policy_uri(self, token_type):
404
        """
405
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
406
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
407
        of the endpoint
408
        """
409 1
        for policy in self._policy_ids:
410 1
            if policy.TokenType == token_type:
411 1
                if policy.SecurityPolicyUri:
412
                    return policy.SecurityPolicyUri
413
                else:   # empty URI means "use this endpoint's policy URI"
414 1
                    return self.security_policy.URI
415
        return self.security_policy.URI
416
417 1
    def activate_session(self, username=None, password=None, certificate=None):
418
        """
419
        Activate session using either username and password or private_key
420
        """
421 1
        params = ua.ActivateSessionParameters()
422 1
        challenge = b""
423 1
        if self.security_policy.server_certificate is not None:
424 1
            challenge += self.security_policy.server_certificate
425 1
        if self._server_nonce is not None:
426 1
            challenge += self._server_nonce
427 1
        params.ClientSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
428 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
429 1
        params.LocaleIds.append("en")
430 1
        if not username and not certificate:
431 1
            self._add_anonymous_auth(params)
432 1
        elif certificate:
433
            self._add_certificate_auth(params, certificate, challenge)
434
        else:
435 1
            self._add_user_auth(params, username, password)
436 1
        return self.uaclient.activate_session(params)
437
438 1
    def _add_anonymous_auth(self, params):
439 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
440 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, "anonymous")
441
442 1
    def _add_certificate_auth(self, params, certificate, challenge):
443
        params.UserIdentityToken = ua.X509IdentityToken()
444
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
445
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
446
        # specs part 4, 5.6.3.1: the data to sign is created by appending
447
        # the last serverNonce to the serverCertificate
448
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
449
        params.UserTokenSignature = ua.SignatureData()
450
        params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
451
        params.UserTokenSignature.Signature = sig
452
453 1
    def _add_user_auth(self, params, username, password):
454 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
455 1
        params.UserIdentityToken.UserName = username
456 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
457 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
458
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
459
            # then the password only contains UTF-8 encoded password
460
            # and EncryptionAlgorithm is null
461 1
            if self._password:
462
                self.logger.warning("Sending plain-text password")
463
                params.UserIdentityToken.Password = password.encode('utf8')
464 1
            params.UserIdentityToken.EncryptionAlgorithm = None
465
        elif self._password:
466
            data, uri = self._encrypt_password(password, policy_uri)
467
            params.UserIdentityToken.Password = data
468
            params.UserIdentityToken.EncryptionAlgorithm = uri
469 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
470
471 1
    def _encrypt_password(self, password, policy_uri):
472
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
473
        # see specs part 4, 7.36.3: if the token is encrypted, password
474
        # shall be converted to UTF-8 and serialized with server nonce
475
        passwd = password.encode("utf8")
476
        if self._server_nonce is not None:
477
            passwd += self._server_nonce
478
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
479
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
480
        return data, uri
481
482 1
    def close_session(self):
483
        """
484
        Close session
485
        """
486 1
        if self.keepalive and self.keepalive.is_alive():
487 1
            self.keepalive.stop()
488 1
            self.keepalive.join()
489 1
        return self.uaclient.close_session(True)
490
491 1
    def get_root_node(self):
492 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
493
494 1
    def get_objects_node(self):
495 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
496
497 1
    def get_server_node(self):
498 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
499
500 1
    def get_node(self, nodeid):
501
        """
502
        Get node using NodeId object or a string representing a NodeId
503
        """
504 1
        return Node(self.uaclient, nodeid)
505
506 1
    def create_subscription(self, period, handler):
507
        """
508
        Create a subscription.
509
        returns a Subscription object which allow
510
        to subscribe to events or data on server
511
        handler argument is a class with data_change and/or event methods.
512
        period argument is either a publishing interval in milliseconds or a
513
        CreateSubscriptionParameters instance. The second option should be used,
514
        if the opcua-server has problems with the default options.
515
        These methods will be called when notfication from server are received.
516
        See example-client.py.
517
        Do not do expensive/slow or network operation from these methods
518
        since they are called directly from receiving thread. This is a design choice,
519
        start another thread if you need to do such a thing.
520
        """
521
522 1
        if isinstance(period, ua.CreateSubscriptionParameters):
523
            return Subscription(self.uaclient, period, handler)
524 1
        params = ua.CreateSubscriptionParameters()
525 1
        params.RequestedPublishingInterval = period
526 1
        params.RequestedLifetimeCount = 10000
527 1
        params.RequestedMaxKeepAliveCount = 3000
528 1
        params.MaxNotificationsPerPublish = 10000
529 1
        params.PublishingEnabled = True
530 1
        params.Priority = 0
531 1
        return Subscription(self.uaclient, params, handler)
532
533 1
    def get_namespace_array(self):
534 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
535 1
        return ns_node.get_value()
536
537 1
    def get_namespace_index(self, uri):
538 1
        uries = self.get_namespace_array()
539 1
        return uries.index(uri)
540
541 1
    def delete_nodes(self, nodes, recursive=False):
542 1
        return delete_nodes(self.uaclient, nodes, recursive)
543
544 1
    def import_xml(self, path=None, xmlstring=None):
545
        """
546
        Import nodes defined in xml
547
        """
548 1
        importer = XmlImporter(self)
549 1
        return importer.import_xml(path, xmlstring)
550
551 1
    def export_xml(self, nodes, path):
552
        """
553
        Export defined nodes to xml
554
        """
555 1
        exp = XmlExporter(self)
556 1
        exp.build_etree(nodes)
557 1
        return exp.write_xml(path)
558
559 1
    def register_namespace(self, uri):
560
        """
561
        Register a new namespace. Nodes should in custom namespace, not 0.
562
        This method is mainly implemented for symetry with server
563
        """
564 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
565 1
        uries = ns_node.get_value()
566 1
        if uri in uries:
567 1
            return uries.index(uri)
568 1
        uries.append(uri)
569 1
        ns_node.set_value(uries)
570 1
        return len(uries) - 1
571
572 1
    def load_type_definitions(self, nodes=None):
573
        """
574
        Load custom types (custom structures/extension objects) definition from server
575
        Generate Python classes for custom structures/extension objects defined in server
576
        These classes will available in ua module
577
        """
578 1
        return load_type_definitions(self, nodes)
579
580 1
    def load_enums(self):
581
        """
582
        generate Python enums for custom enums on server.
583
        This enums will be available in ua module
584
        """
585
        return load_enums(self)
586