Completed
Push — master ( 52223d...eab167 )
by Olivier
01:29
created

Client.__enter__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 10
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
use_crypto = True
17 1
try:
18 1
    from opcua.crypto import uacrypto
19 1
except ImportError:
20 1
    print("cryptography is not installed, use of crypto disabled")
21 1
    use_crypto = False
22
23
24 1
class KeepAlive(Thread):
25
26
    """
27
    Used by Client to keep session opened.
28
    OPCUA defines timeout both for sessions and secure channel
29
    """
30
31 1
    def __init__(self, client, timeout):
32 1
        Thread.__init__(self)
33 1
        self.logger = logging.getLogger(__name__)
34 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
35
            timeout = 360000
36 1
        self.timeout = timeout
37 1
        self.client = client
38 1
        self._dostop = False
39 1
        self._cond = Condition()
40
41 1
    def run(self):
42 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
43 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
44 1
        while not self._dostop:
45 1
            with self._cond:
46 1
                self._cond.wait(self.timeout / 1000)
47 1
            if self._dostop:
48 1
                break
49
            self.logger.debug("renewing channel")
50
            self.client.open_secure_channel(renew=True)
51
            val = server_state.get_value()
52
            self.logger.debug("server state is: %s ", val)
53 1
        self.logger.debug("keepalive thread has stopped")
54
55 1
    def stop(self):
56 1
        self.logger.debug("stoping keepalive thread")
57 1
        self._dostop = True
58 1
        with self._cond:
59 1
            self._cond.notify_all()
60
61
62 1
class Client(object):
63
64
    """
65
    High level client to connect to an OPC-UA server.
66
67
    This class makes it easy to connect and browse address space.
68
    It attemps to expose as much functionality as possible
69
    but if you want more flexibility it is possible and adviced to
70
    use UaClient object, available as self.uaclient
71
    which offers the raw OPC-UA services interface.
72
    """
73
74 1
    def __init__(self, url, timeout=4):
75
        """
76
        used url argument to connect to server.
77
        if you are unsure of url, write at least hostname and port
78
        and call get_endpoints
79
        timeout is the timeout to get an answer for requests to server
80
        public member of this call are available to be set by API users
81
82
        """
83 1
        self.logger = logging.getLogger(__name__)
84 1
        self.server_url = urlparse(url)
85 1
        self.name = "Pure Python Client"
86 1
        self.description = self.name
87 1
        self.application_uri = "urn:freeopcua:client"
88 1
        self.product_uri = "urn:freeopcua.github.no:client"
89 1
        self.security_policy = ua.SecurityPolicy()
90 1
        self.secure_channel_id = None
91 1
        self.default_timeout = 3600000
92 1
        self.secure_channel_timeout = self.default_timeout
93 1
        self.session_timeout = self.default_timeout
94 1
        self._policy_ids = []
95 1
        self.uaclient = UaClient(timeout)
96 1
        self.user_certificate = None
97 1
        self.user_private_key = None
98 1
        self._session_counter = 1
99 1
        self.keepalive = None
100
101 1
    def __enter__(self):
102
        self.connect()
103
        return self
104
105 1
    def __exit__(self):
106
        self.disconnect()
107
108 1
    @staticmethod
109
    def find_endpoint(endpoints, security_mode, policy_uri):
110
        """
111
        Find endpoint with required security mode and policy URI
112
        """
113 1
        for ep in endpoints:
114 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
115
                    ep.SecurityMode == security_mode and
116
                    ep.SecurityPolicyUri == policy_uri):
117 1
                return ep
118
        raise ua.UaError("No matching endpoints: {}, {}".format(
119
                         security_mode, policy_uri))
120
121 1
    def set_security_string(self, string):
122
        """
123
        Set SecureConnection mode. String format:
124
        Policy,Mode,certificate,private_key[,server_private_key]
125
        where Policy is Basic128Rsa15 or Basic256,
126
            Mode is Sign or SignAndEncrypt
127
            certificate, private_key and server_private_key are
128
                paths to .pem or .der files
129
        Call this before connect()
130
        """
131
        if not string:
132
            return
133
        parts = string.split(',')
134
        if len(parts) < 4:
135
            raise ua.UaError('Wrong format: `{}`, expected at least 4 comma-separated values'.format(string))
136
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
137
        mode = getattr(ua.MessageSecurityMode, parts[1])
138
        return self.set_security(policy_class, parts[2], parts[3],
139
                                 parts[4] if len(parts) >= 5 else None, mode)
140
141 1
    def set_security(self, policy, certificate_path, private_key_path,
142
                     server_certificate_path=None,
143
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
144
        """
145
        Set SecureConnection mode.
146
        Call this before connect()
147
        """
148
        if server_certificate_path is None:
149
            # load certificate from server's list of endpoints
150
            endpoints = self.connect_and_get_server_endpoints()
151
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
152
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
153
        else:
154
            server_cert = uacrypto.load_certificate(server_certificate_path)
155
        cert = uacrypto.load_certificate(certificate_path)
156
        pk = uacrypto.load_private_key(private_key_path)
157
        self.security_policy = policy(server_cert, cert, pk, mode)
158
        self.uaclient.set_security(self.security_policy)
159
160 1
    def load_client_certificate(self, path):
161
        """
162
        load our certificate from file, either pem or der
163
        """
164
        self.user_certificate = uacrypto.load_certificate(path)
165
166 1
    def load_private_key(self, path):
167
        """
168
        Load user private key. This is used for authenticating using certificate
169
        """
170
        self.user_private_key = uacrypto.load_private_key(path)
171
172 1
    def connect_and_get_server_endpoints(self):
173
        """
174
        Connect, ask server for endpoints, and disconnect
175
        """
176
        self.connect_socket()
177
        self.send_hello()
178
        self.open_secure_channel()
179
        endpoints = self.get_endpoints()
180
        self.close_secure_channel()
181
        self.disconnect_socket()
182
        return endpoints
183
184 1
    def connect_and_find_servers(self):
185
        """
186
        Connect, ask server for a list of known servers, and disconnect
187
        """
188
        self.connect_socket()
189
        self.send_hello()
190
        self.open_secure_channel()  # spec says it should not be necessary to open channel
191
        servers = self.find_servers()
192
        self.close_secure_channel()
193
        self.disconnect_socket()
194
        return servers
195
196 1
    def connect_and_find_servers_on_network(self):
197
        """
198
        Connect, ask server for a list of known servers on network, and disconnect
199
        """
200
        self.connect_socket()
201
        self.send_hello()
202
        self.open_secure_channel()
203
        servers = self.find_servers_on_network()
204
        self.close_secure_channel()
205
        self.disconnect_socket()
206
        return servers
207
208 1
    def connect(self):
209
        """
210
        High level method
211
        Connect, create and activate session
212
        """
213 1
        self.connect_socket()
214 1
        self.send_hello()
215 1
        self.open_secure_channel()
216 1
        self.create_session()
217 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
218
219 1
    def disconnect(self):
220
        """
221
        High level method
222
        Close session, secure channel and socket
223
        """
224 1
        self.close_session()
225 1
        self.close_secure_channel()
226 1
        self.disconnect_socket()
227
228 1
    def connect_socket(self):
229
        """
230
        connect to socket defined in url
231
        """
232 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
233
234 1
    def disconnect_socket(self):
235 1
        self.uaclient.disconnect_socket()
236
237 1
    def send_hello(self):
238
        """
239
        Send OPC-UA hello to server
240
        """
241 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
242
        # FIXME check ack
243
244 1
    def open_secure_channel(self, renew=False):
245
        """
246
        Open secure channel, if renew is True, renew channel
247
        """
248 1
        params = ua.OpenSecureChannelParameters()
249 1
        params.ClientProtocolVersion = 0
250 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
251 1
        if renew:
252
            params.RequestType = ua.SecurityTokenRequestType.Renew
253 1
        params.SecurityMode = self.security_policy.Mode
254 1
        params.RequestedLifetime = self.secure_channel_timeout
255 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
256 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
257 1
        result = self.uaclient.open_secure_channel(params)
258 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
259 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
260
261 1
    def close_secure_channel(self):
262 1
        return self.uaclient.close_secure_channel()
263
264 1
    def get_endpoints(self):
265 1
        params = ua.GetEndpointsParameters()
266 1
        params.EndpointUrl = self.server_url.geturl()
267 1
        return self.uaclient.get_endpoints(params)
268
269 1
    def register_server(self, server, discovery_configuration=None):
270
        """
271
        register a server to discovery server
272
        if discovery_configuration is provided, the newer register_server2 service call is used
273
        """
274 1
        serv = ua.RegisteredServer()
275 1
        serv.ServerUri = server.application_uri
276 1
        serv.ProductUri = server.product_uri
277 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
278 1
        serv.ServerType = server.application_type
279 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
280 1
        serv.IsOnline = True
281 1
        if discovery_configuration:
282
            params = ua.RegisterServer2Parameters()
283
            params.Server = serv
284
            params.DiscoveryConfiguration = discovery_configuration
285
            return self.uaclient.register_server2(params)
286
        else:
287 1
            return self.uaclient.register_server(serv)
288
289 1
    def find_servers(self, uris=None):
290
        """
291
        send a FindServer request to the server. The answer should be a list of
292
        servers the server knows about
293
        A list of uris can be provided, only server having matching uris will be returned
294
        """
295 1
        if uris is None:
296 1
            uris = []
297 1
        params = ua.FindServersParameters()
298 1
        params.EndpointUrl = self.server_url.geturl()
299 1
        params.ServerUris = uris
300 1
        return self.uaclient.find_servers(params)
301
302 1
    def find_servers_on_network(self):
303
        params = ua.FindServersOnNetworkParameters()
304
        return self.uaclient.find_servers_on_network(params)
305
306 1
    def create_session(self):
307
        """
308
        send a CreateSessionRequest to server with reasonable parameters.
309
        If you want o modify settings look at code of this methods
310
        and make your own
311
        """
312 1
        desc = ua.ApplicationDescription()
313 1
        desc.ApplicationUri = self.application_uri
314 1
        desc.ProductUri = self.product_uri
315 1
        desc.ApplicationName = ua.LocalizedText(self.name)
316 1
        desc.ApplicationType = ua.ApplicationType.Client
317
318 1
        params = ua.CreateSessionParameters()
319 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
320 1
        params.ClientNonce = nonce
321 1
        params.ClientCertificate = self.security_policy.client_certificate
322 1
        params.ClientDescription = desc
323 1
        params.EndpointUrl = self.server_url.geturl()
324 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
325 1
        params.RequestedSessionTimeout = 3600000
326 1
        params.MaxResponseMessageSize = 0  # means no max size
327 1
        response = self.uaclient.create_session(params)
328 1
        if self.security_policy.client_certificate is None:
329 1
            data = nonce
330
        else:
331
            data = self.security_policy.client_certificate + nonce
332 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
333 1
        self._server_nonce = response.ServerNonce
334 1
        if not self.security_policy.server_certificate:
335 1
            self.security_policy.server_certificate = response.ServerCertificate
336
        elif self.security_policy.server_certificate != response.ServerCertificate:
337
            raise ua.UaError("Server certificate mismatch")
338
        # remember PolicyId's: we will use them in activate_session()
339 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
340 1
        self._policy_ids = ep.UserIdentityTokens
341 1
        self.session_timeout = response.RevisedSessionTimeout
342 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
343 1
        self.keepalive.start()
344 1
        return response
345
346 1
    def server_policy_id(self, token_type, default):
347
        """
348
        Find PolicyId of server's UserTokenPolicy by token_type.
349
        Return default if there's no matching UserTokenPolicy.
350
        """
351 1
        for policy in self._policy_ids:
352 1
            if policy.TokenType == token_type:
353 1
                return policy.PolicyId
354
        return default
355
356 1
    def server_policy_uri(self, token_type):
357
        """
358
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
359
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
360
        of the endpoint
361
        """
362 1
        for policy in self._policy_ids:
363 1
            if policy.TokenType == token_type:
364 1
                if policy.SecurityPolicyUri:
365
                    return policy.SecurityPolicyUri
366
                else:   # empty URI means "use this endpoint's policy URI"
367 1
                    return self.security_policy.URI
368
        return self.security_policy.URI
369
370 1
    def activate_session(self, username=None, password=None, certificate=None):
371
        """
372
        Activate session using either username and password or private_key
373
        """
374 1
        params = ua.ActivateSessionParameters()
375 1
        challenge = b""
376 1
        if self.security_policy.server_certificate is not None:
377
            challenge += self.security_policy.server_certificate
378 1
        if self._server_nonce is not None:
379 1
            challenge += self._server_nonce
380 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
381 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
382 1
        params.LocaleIds.append("en")
383 1
        if not username and not certificate:
384 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
385 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
386 1
        elif certificate:
387
            params.UserIdentityToken = ua.X509IdentityToken()
388
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
389
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
390
            # specs part 4, 5.6.3.1: the data to sign is created by appending
391
            # the last serverNonce to the serverCertificate
392
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
393
            params.UserTokenSignature = ua.SignatureData()
394
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
395
            params.UserTokenSignature.Signature = sig
396
        else:
397 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
398 1
            params.UserIdentityToken.UserName = username
399 1
            policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
400 1
            if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
401
                # see specs part 4, 7.36.3: if the token is NOT encrypted,
402
                # then the password only contains UTF-8 encoded password
403
                # and EncryptionAlgorithm is null
404 1
                if self.server_url.password:
405
                    self.logger.warning("Sending plain-text password")
406
                    params.UserIdentityToken.Password = password
407 1
                params.UserIdentityToken.EncryptionAlgorithm = ''
408
            elif self.server_url.password:
409
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
410
                # see specs part 4, 7.36.3: if the token is encrypted, password
411
                # shall be converted to UTF-8 and serialized with server nonce
412
                passwd = bytes(password, "utf8")
413
                if self._server_nonce is not None:
414
                    passwd += self._server_nonce
415
                etoken = ua.pack_bytes(passwd)
416
                data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
417
                params.UserIdentityToken.Password = data
418
                params.UserIdentityToken.EncryptionAlgorithm = uri
419 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
420 1
        return self.uaclient.activate_session(params)
421
422 1
    def close_session(self):
423
        """
424
        Close session
425
        """
426 1
        if self.keepalive:
427 1
            self.keepalive.stop()
428 1
        return self.uaclient.close_session(True)
429
430 1
    def get_root_node(self):
431 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
432
433 1
    def get_objects_node(self):
434 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
435
436 1
    def get_server_node(self):
437 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
438
439 1
    def get_node(self, nodeid):
440
        """
441
        Get node using NodeId object or a string representing a NodeId
442
        """
443 1
        return Node(self.uaclient, nodeid)
444
445 1
    def create_subscription(self, period, handler):
446
        """
447
        Create a subscription.
448
        returns a Subscription object which allow
449
        to subscribe to events or data on server
450
        handler argument is a class with data_change and/or event methods.
451
        These methods will be called when notfication from server are received.
452
        See example-client.py.
453
        Do not do expensive/slow or network operation from these methods
454
        since they are called directly from receiving thread. This is a design choice,
455
        start another thread if you need to do such a thing.
456
        """
457 1
        params = ua.CreateSubscriptionParameters()
458 1
        params.RequestedPublishingInterval = period
459 1
        params.RequestedLifetimeCount = 3000
460 1
        params.RequestedMaxKeepAliveCount = 10000
461 1
        params.MaxNotificationsPerPublish = 10000
462 1
        params.PublishingEnabled = True
463 1
        params.Priority = 0
464 1
        return Subscription(self.uaclient, params, handler)
465
466 1
    def get_namespace_array(self):
467 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
468 1
        return ns_node.get_value()
469
470 1
    def get_namespace_index(self, uri):
471 1
        uries = self.get_namespace_array()
472 1
        return uries.index(uri)
473
474 1
    def delete_nodes(self, nodes, recursive=False):
475 1
        return delete_nodes(self.uaclient, nodes, recursive)
476
            
477