Completed
Pull Request — master (#517)
by
unknown
05:40
created

Client.set_max_messagesize()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 5
ccs 1
cts 2
cp 0.5
crap 1.125
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6 1
except ImportError:  # support for python2
7 1
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.xmlimporter import XmlImporter
12 1
from opcua.common.xmlexporter import XmlExporter
13 1
from opcua.common.node import Node
14 1
from opcua.common.manage_nodes import delete_nodes
15 1
from opcua.common.subscription import Subscription
16 1
from opcua.common import utils
17 1
from opcua.crypto import security_policies
18 1
from opcua.common.shortcuts import Shortcuts
19 1
from opcua.common.structures import load_type_definitions
20 1
use_crypto = True
21 1
try:
22 1
    from opcua.crypto import uacrypto
23
except ImportError:
24
    print("cryptography is not installed, use of crypto disabled")
25
    use_crypto = False
26
27
28 1
class KeepAlive(Thread):
29
30
    """
31
    Used by Client to keep the session open.
32
    OPCUA defines timeout both for sessions and secure channel
33
    """
34
35 1
    def __init__(self, client, timeout):
36
        """
37
        :param session_timeout: Timeout to re-new the session
38
            in milliseconds.
39
        """
40 1
        Thread.__init__(self)
41 1
        self.logger = logging.getLogger(__name__)
42
43 1
        self.client = client
44 1
        self._dostop = False
45 1
        self._cond = Condition()
46 1
        self.timeout = timeout
47
48
        # some server support no timeout, but we do not trust them
49 1
        if self.timeout == 0:
50
            self.timeout = 3600000 # 1 hour
51
52 1
    def run(self):
53 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
54 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
55 1
        while not self._dostop:
56 1
            with self._cond:
57 1
                self._cond.wait(self.timeout / 1000)
58 1
            if self._dostop:
59 1
                break
60
            self.logger.debug("renewing channel")
61
            self.client.open_secure_channel(renew=True)
62
            val = server_state.get_value()
63
            self.logger.debug("server state is: %s ", val)
64 1
        self.logger.debug("keepalive thread has stopped")
65
66 1
    def stop(self):
67 1
        self.logger.debug("stoping keepalive thread")
68 1
        self._dostop = True
69 1
        with self._cond:
70 1
            self._cond.notify_all()
71
72
73 1
class Client(object):
74
75
    """
76
    High level client to connect to an OPC-UA server.
77
78
    This class makes it easy to connect and browse address space.
79
    It attemps to expose as much functionality as possible
80
    but if you want more flexibility it is possible and adviced to
81
    use UaClient object, available as self.uaclient
82
    which offers the raw OPC-UA services interface.
83
    """
84
85 1
    def __init__(self, url, timeout=4):
86
        """
87
88
        :param url: url of the server.
89
            if you are unsure of url, write at least hostname
90
            and port and call get_endpoints
91
92
        :param timeout:
93
            Each request sent to the server expects an answer within this
94
            time. The timeout is specified in seconds.
95
        """
96 1
        self.logger = logging.getLogger(__name__)
97 1
        self.server_url = urlparse(url)
98
        #take initial username and password from the url
99 1
        self._username = self.server_url.username
100 1
        self._password = self.server_url.password
101 1
        self.name = "Pure Python Client"
102 1
        self.description = self.name
103 1
        self.application_uri = "urn:freeopcua:client"
104 1
        self.product_uri = "urn:freeopcua.github.no:client"
105 1
        self.security_policy = ua.SecurityPolicy()
106 1
        self.secure_channel_id = None
107 1
        self.secure_channel_timeout = 3600000 # 1 hour
108 1
        self.session_timeout = 3600000 # 1 hour
109 1
        self._policy_ids = []
110 1
        self.uaclient = UaClient(timeout)
111 1
        self.user_certificate = None
112 1
        self.user_private_key = None
113 1
        self._server_nonce = None
114 1
        self._session_counter = 1
115 1
        self.keepalive = None
116 1
        self.nodes = Shortcuts(self.uaclient)
117 1
        self._maxMessageSize = 0 # No limits
118 1
        self._maxChunkCount = 0 # No limits
119
120
121 1
    def __enter__(self):
122 1
        self.connect()
123 1
        return self
124
125 1
    def __exit__(self, exc_type, exc_value, traceback):
126 1
        self.disconnect()
127
128 1
    @staticmethod
129
    def find_endpoint(endpoints, security_mode, policy_uri):
130
        """
131
        Find endpoint with required security mode and policy URI
132
        """
133 1
        for ep in endpoints:
134 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
135
                    ep.SecurityMode == security_mode and
136
                    ep.SecurityPolicyUri == policy_uri):
137 1
                return ep
138 1
        raise ua.UaError("No matching endpoints: {0}, {1}".format(security_mode, policy_uri))
139
140 1
    def set_user(self, username):
141
        """
142
        Set user name for the connection.
143
        initial user from the URL will be overwritten
144
        """
145
        self._username = username
146
147 1
    def set_password(self, pwd):
148
        """
149
        Set user password for the connection.
150
        initial password from the URL will be overwritten
151
        """
152
        self._password = pwd
153
154 1
    def set_max_messagesize(self, maxMessageSize):
155
        """
156
        Set the max messagesize for the connection.
157
        """
158
        self._maxMessageSize = maxMessageSize
159
    
160
161 1
    def set_max_chunkcount(self, maxChunkCount):
162
        """
163
        Set the max chunkcount for the connection.
164
        """
165
        self._maxChunkCount = maxChunkCount
166
    
167
168 1
    def set_security_string(self, string):
169
        """
170
        Set SecureConnection mode. String format:
171
        Policy,Mode,certificate,private_key[,server_private_key]
172
        where Policy is Basic128Rsa15 or Basic256,
173
            Mode is Sign or SignAndEncrypt
174
            certificate, private_key and server_private_key are
175
                paths to .pem or .der files
176
        Call this before connect()
177
        """
178 1
        if not string:
179
            return
180 1
        parts = string.split(',')
181 1
        if len(parts) < 4:
182
            raise ua.UaError('Wrong format: `{0}`, expected at least 4 comma-separated values'.format(string))
183 1
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
184 1
        mode = getattr(ua.MessageSecurityMode, parts[1])
185 1
        return self.set_security(policy_class, parts[2], parts[3],
186
                                 parts[4] if len(parts) >= 5 else None, mode)
187
188 1
    def set_security(self, policy, certificate_path, private_key_path,
189
                     server_certificate_path=None,
190
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
191
        """
192
        Set SecureConnection mode.
193
        Call this before connect()
194
        """
195 1
        if server_certificate_path is None:
196
            # load certificate from server's list of endpoints
197 1
            endpoints = self.connect_and_get_server_endpoints()
198 1
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
199 1
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
200
        else:
201
            server_cert = uacrypto.load_certificate(server_certificate_path)
202 1
        cert = uacrypto.load_certificate(certificate_path)
203 1
        pk = uacrypto.load_private_key(private_key_path)
204 1
        self.security_policy = policy(server_cert, cert, pk, mode)
205 1
        self.uaclient.set_security(self.security_policy)
206
207 1
    def load_client_certificate(self, path):
208
        """
209
        load our certificate from file, either pem or der
210
        """
211
        self.user_certificate = uacrypto.load_certificate(path)
212
213 1
    def load_private_key(self, path):
214
        """
215
        Load user private key. This is used for authenticating using certificate
216
        """
217
        self.user_private_key = uacrypto.load_private_key(path)
218
219 1
    def connect_and_get_server_endpoints(self):
220
        """
221
        Connect, ask server for endpoints, and disconnect
222
        """
223 1
        self.connect_socket()
224 1
        self.send_hello()
225 1
        self.open_secure_channel()
226 1
        endpoints = self.get_endpoints()
227 1
        self.close_secure_channel()
228 1
        self.disconnect_socket()
229 1
        return endpoints
230
231 1
    def connect_and_find_servers(self):
232
        """
233
        Connect, ask server for a list of known servers, and disconnect
234
        """
235
        self.connect_socket()
236
        self.send_hello()
237
        self.open_secure_channel()  # spec says it should not be necessary to open channel
238
        servers = self.find_servers()
239
        self.close_secure_channel()
240
        self.disconnect_socket()
241
        return servers
242
243 1
    def connect_and_find_servers_on_network(self):
244
        """
245
        Connect, ask server for a list of known servers on network, and disconnect
246
        """
247
        self.connect_socket()
248
        self.send_hello()
249
        self.open_secure_channel()
250
        servers = self.find_servers_on_network()
251
        self.close_secure_channel()
252
        self.disconnect_socket()
253
        return servers
254
255 1
    def connect(self):
256
        """
257
        High level method
258
        Connect, create and activate session
259
        """
260 1
        self.connect_socket()
261 1
        self.send_hello()
262 1
        self.open_secure_channel()
263 1
        self.create_session()
264 1
        self.activate_session(username=self._username, password=self._password, certificate=self.user_certificate)
265
266 1
    def disconnect(self):
267
        """
268
        High level method
269
        Close session, secure channel and socket
270
        """
271 1
        try:
272 1
            self.close_session()
273 1
            self.close_secure_channel()
274
        finally:
275 1
            self.disconnect_socket()
276
277 1
    def connect_socket(self):
278
        """
279
        connect to socket defined in url
280
        """
281 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
282
283 1
    def disconnect_socket(self):
284 1
        self.uaclient.disconnect_socket()
285
286 1
    def send_hello(self):
287
        """
288
        Send OPC-UA hello to server
289
        """
290 1
        ack = self.uaclient.send_hello(self.server_url.geturl(), self._maxMessageSize, self._maxChunkCount)
291
        # FIXME check ack
292
293 1
    def open_secure_channel(self, renew=False):
294
        """
295
        Open secure channel, if renew is True, renew channel
296
        """
297 1
        params = ua.OpenSecureChannelParameters()
298 1
        params.ClientProtocolVersion = 0
299 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
300 1
        if renew:
301
            params.RequestType = ua.SecurityTokenRequestType.Renew
302 1
        params.SecurityMode = self.security_policy.Mode
303 1
        params.RequestedLifetime = self.secure_channel_timeout
304 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
305 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
306 1
        result = self.uaclient.open_secure_channel(params)
307 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
308 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
309
310 1
    def close_secure_channel(self):
311 1
        return self.uaclient.close_secure_channel()
312
313 1
    def get_endpoints(self):
314 1
        params = ua.GetEndpointsParameters()
315 1
        params.EndpointUrl = self.server_url.geturl()
316 1
        return self.uaclient.get_endpoints(params)
317
318 1
    def register_server(self, server, discovery_configuration=None):
319
        """
320
        register a server to discovery server
321
        if discovery_configuration is provided, the newer register_server2 service call is used
322
        """
323 1
        serv = ua.RegisteredServer()
324 1
        serv.ServerUri = server.application_uri
325 1
        serv.ProductUri = server.product_uri
326 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
327 1
        serv.ServerType = server.application_type
328 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
329 1
        serv.IsOnline = True
330 1
        if discovery_configuration:
331
            params = ua.RegisterServer2Parameters()
332
            params.Server = serv
333
            params.DiscoveryConfiguration = discovery_configuration
334
            return self.uaclient.register_server2(params)
335
        else:
336 1
            return self.uaclient.register_server(serv)
337
338 1
    def find_servers(self, uris=None):
339
        """
340
        send a FindServer request to the server. The answer should be a list of
341
        servers the server knows about
342
        A list of uris can be provided, only server having matching uris will be returned
343
        """
344 1
        if uris is None:
345 1
            uris = []
346 1
        params = ua.FindServersParameters()
347 1
        params.EndpointUrl = self.server_url.geturl()
348 1
        params.ServerUris = uris
349 1
        return self.uaclient.find_servers(params)
350
351 1
    def find_servers_on_network(self):
352
        params = ua.FindServersOnNetworkParameters()
353
        return self.uaclient.find_servers_on_network(params)
354
355 1
    def create_session(self):
356
        """
357
        send a CreateSessionRequest to server with reasonable parameters.
358
        If you want o modify settings look at code of this methods
359
        and make your own
360
        """
361 1
        desc = ua.ApplicationDescription()
362 1
        desc.ApplicationUri = self.application_uri
363 1
        desc.ProductUri = self.product_uri
364 1
        desc.ApplicationName = ua.LocalizedText(self.name)
365 1
        desc.ApplicationType = ua.ApplicationType.Client
366
367 1
        params = ua.CreateSessionParameters()
368 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
369 1
        params.ClientNonce = nonce
370 1
        params.ClientCertificate = self.security_policy.client_certificate
371 1
        params.ClientDescription = desc
372 1
        params.EndpointUrl = self.server_url.geturl()
373 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
374 1
        params.RequestedSessionTimeout = 3600000
375 1
        params.MaxResponseMessageSize = 0  # means no max size
376 1
        response = self.uaclient.create_session(params)
377 1
        if self.security_policy.client_certificate is None:
378 1
            data = nonce
379
        else:
380 1
            data = self.security_policy.client_certificate + nonce
381 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
382 1
        self._server_nonce = response.ServerNonce
383 1
        if not self.security_policy.server_certificate:
384 1
            self.security_policy.server_certificate = response.ServerCertificate
385 1
        elif self.security_policy.server_certificate != response.ServerCertificate:
386
            raise ua.UaError("Server certificate mismatch")
387
        # remember PolicyId's: we will use them in activate_session()
388 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
389 1
        self._policy_ids = ep.UserIdentityTokens
390 1
        self.session_timeout = response.RevisedSessionTimeout
391 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
392 1
        self.keepalive.start()
393 1
        return response
394
395 1
    def server_policy_id(self, token_type, default):
396
        """
397
        Find PolicyId of server's UserTokenPolicy by token_type.
398
        Return default if there's no matching UserTokenPolicy.
399
        """
400 1
        for policy in self._policy_ids:
401 1
            if policy.TokenType == token_type:
402 1
                return policy.PolicyId
403
        return default
404
405 1
    def server_policy_uri(self, token_type):
406
        """
407
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
408
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
409
        of the endpoint
410
        """
411 1
        for policy in self._policy_ids:
412 1
            if policy.TokenType == token_type:
413 1
                if policy.SecurityPolicyUri:
414
                    return policy.SecurityPolicyUri
415
                else:   # empty URI means "use this endpoint's policy URI"
416 1
                    return self.security_policy.URI
417
        return self.security_policy.URI
418
419 1
    def activate_session(self, username=None, password=None, certificate=None):
420
        """
421
        Activate session using either username and password or private_key
422
        """
423 1
        params = ua.ActivateSessionParameters()
424 1
        challenge = b""
425 1
        if self.security_policy.server_certificate is not None:
426 1
            challenge += self.security_policy.server_certificate
427 1
        if self._server_nonce is not None:
428 1
            challenge += self._server_nonce
429 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
430 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
431 1
        params.LocaleIds.append("en")
432 1
        if not username and not certificate:
433 1
            self._add_anonymous_auth(params)
434 1
        elif certificate:
435
            self._add_certificate_auth(params, certificate, challenge)
436
        else:
437 1
            self._add_user_auth(params, username, password)
438 1
        return self.uaclient.activate_session(params)
439
440 1
    def _add_anonymous_auth(self, params):
441 1
        params.UserIdentityToken = ua.AnonymousIdentityToken()
442 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
443
444 1
    def _add_certificate_auth(self, params, certificate, challenge):
445
        params.UserIdentityToken = ua.X509IdentityToken()
446
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
447
        params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
448
        # specs part 4, 5.6.3.1: the data to sign is created by appending
449
        # the last serverNonce to the serverCertificate
450
        sig = uacrypto.sign_sha1(self.user_private_key, challenge)
451
        params.UserTokenSignature = ua.SignatureData()
452
        params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
453
        params.UserTokenSignature.Signature = sig
454
455 1
    def _add_user_auth(self, params, username, password):
456 1
        params.UserIdentityToken = ua.UserNameIdentityToken()
457 1
        params.UserIdentityToken.UserName = username
458 1
        policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
459 1
        if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
460
            # see specs part 4, 7.36.3: if the token is NOT encrypted,
461
            # then the password only contains UTF-8 encoded password
462
            # and EncryptionAlgorithm is null
463 1
            if self._password:
464
                self.logger.warning("Sending plain-text password")
465
                params.UserIdentityToken.Password = password
466 1
            params.UserIdentityToken.EncryptionAlgorithm = None
467
        elif self._password:
468
            data, uri = self._encrypt_password(password, policy_uri)
469
            params.UserIdentityToken.Password = data
470
            params.UserIdentityToken.EncryptionAlgorithm = uri
471 1
        params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
472
473 1
    def _encrypt_password(self, password, policy_uri):
474
        pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
475
        # see specs part 4, 7.36.3: if the token is encrypted, password
476
        # shall be converted to UTF-8 and serialized with server nonce
477
        passwd = password.encode("utf8")
478
        if self._server_nonce is not None:
479
            passwd += self._server_nonce
480
        etoken = ua.ua_binary.Primitives.Bytes.pack(passwd)
481
        data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
482
        return data, uri
483
484 1
    def close_session(self):
485
        """
486
        Close session
487
        """
488 1
        if self.keepalive:
489 1
            self.keepalive.stop()
490 1
        return self.uaclient.close_session(True)
491
492 1
    def get_root_node(self):
493 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
494
495 1
    def get_objects_node(self):
496 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
497
498 1
    def get_server_node(self):
499 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
500
501 1
    def get_node(self, nodeid):
502
        """
503
        Get node using NodeId object or a string representing a NodeId
504
        """
505 1
        return Node(self.uaclient, nodeid)
506
507 1
    def create_subscription(self, period, handler):
508
        """
509
        Create a subscription.
510
        returns a Subscription object which allow
511
        to subscribe to events or data on server
512
        handler argument is a class with data_change and/or event methods.
513
        period argument is either a publishing interval in milliseconds or a
514
        CreateSubscriptionParameters instance. The second option should be used,
515
        if the opcua-server has problems with the default options.
516
        These methods will be called when notfication from server are received.
517
        See example-client.py.
518
        Do not do expensive/slow or network operation from these methods
519
        since they are called directly from receiving thread. This is a design choice,
520
        start another thread if you need to do such a thing.
521
        """
522
523 1
        if isinstance(period, ua.CreateSubscriptionParameters):
524
            return Subscription(self.uaclient, period, handler)
525 1
        params = ua.CreateSubscriptionParameters()
526 1
        params.RequestedPublishingInterval = period
527 1
        params.RequestedLifetimeCount = 10000
528 1
        params.RequestedMaxKeepAliveCount = 3000
529 1
        params.MaxNotificationsPerPublish = 10000
530 1
        params.PublishingEnabled = True
531 1
        params.Priority = 0
532 1
        return Subscription(self.uaclient, params, handler)
533
534 1
    def get_namespace_array(self):
535 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
536 1
        return ns_node.get_value()
537
538 1
    def get_namespace_index(self, uri):
539 1
        uries = self.get_namespace_array()
540 1
        return uries.index(uri)
541
542 1
    def delete_nodes(self, nodes, recursive=False):
543 1
        return delete_nodes(self.uaclient, nodes, recursive)
544
545 1
    def import_xml(self, path):
546
        """
547
        Import nodes defined in xml
548
        """
549 1
        importer = XmlImporter(self)
550 1
        return importer.import_xml(path)
551
552 1
    def export_xml(self, nodes, path):
553
        """
554
        Export defined nodes to xml
555
        """
556 1
        exp = XmlExporter(self)
557 1
        exp.build_etree(nodes)
558 1
        return exp.write_xml(path)
559
560 1
    def register_namespace(self, uri):
561
        """
562
        Register a new namespace. Nodes should in custom namespace, not 0.
563
        This method is mainly implemented for symetry with server
564
        """
565 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
566 1
        uries = ns_node.get_value()
567 1
        if uri in uries:
568 1
            return uries.index(uri)
569 1
        uries.append(uri)
570 1
        ns_node.set_value(uries)
571 1
        return len(uries) - 1
572
573 1
    def load_type_definitions(self, nodes=None):
574
        return load_type_definitions(self, nodes)
575
576
577