Completed
Pull Request — master (#163)
by
unknown
02:50
created

KeepAlive.__init__()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2.004
Metric Value
cc 2
dl 0
loc 10
ccs 9
cts 10
cp 0.9
crap 2.004
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.manage_nodes import delete_nodes
13 1
from opcua.common.subscription import Subscription
14 1
from opcua.common import utils
15 1
from opcua.crypto import security_policies
16 1
use_crypto = True
17 1
try:
18 1
    from opcua.crypto import uacrypto
19 1
except ImportError:
20 1
    print("cryptography is not installed, use of crypto disabled")
21 1
    use_crypto = False
22
23
24 1
class KeepAlive(Thread):
25
26
    """
27
    Used by Client to keep session opened.
28
    OPCUA defines timeout both for sessions and secure channel
29
    """
30
31 1
    def __init__(self, client, timeout):
32 1
        Thread.__init__(self)
33 1
        self.logger = logging.getLogger(__name__)
34 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
35
            timeout = 360000
36 1
        self.timeout = timeout
37 1
        self.daemon = True
38 1
        self.client = client
39 1
        self._dostop = False
40 1
        self._cond = Condition()
41
42 1
    def run(self):
43 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
44 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
45 1
        while not self._dostop:
46 1
            with self._cond:
47 1
                self._cond.wait(self.timeout / 1000)
48 1
            if self._dostop:
49 1
                break
50
            self.logger.debug("renewing channel")
51
            self.client.open_secure_channel(renew=True)
52
            val = server_state.get_value()
53
            self.logger.debug("server state is: %s ", val)
54 1
        self.logger.debug("keepalive thread has stopped")
55
56 1
    def stop(self):
57 1
        self.logger.debug("stoping keepalive thread")
58 1
        self._dostop = True
59 1
        with self._cond:
60 1
            self._cond.notify_all()
61
62
63 1
class Client(object):
64
65
    """
66
    High level client to connect to an OPC-UA server.
67
68
    This class makes it easy to connect and browse address space.
69
    It attemps to expose as much functionality as possible
70
    but if you want more flexibility it is possible and adviced to
71
    use UaClient object, available as self.uaclient
72
    which offers the raw OPC-UA services interface.
73
    """
74
75 1
    def __init__(self, url, timeout=4):
76
        """
77
        used url argument to connect to server.
78
        if you are unsure of url, write at least hostname and port
79
        and call get_endpoints
80
        timeout is the timeout to get an answer for requests to server
81
        public member of this call are available to be set by API users
82
83
        """
84 1
        self.logger = logging.getLogger(__name__)
85 1
        self.server_url = urlparse(url)
86 1
        self.name = "Pure Python Client"
87 1
        self.description = self.name
88 1
        self.application_uri = "urn:freeopcua:client"
89 1
        self.product_uri = "urn:freeopcua.github.no:client"
90 1
        self.security_policy = ua.SecurityPolicy()
91 1
        self.secure_channel_id = None
92 1
        self.default_timeout = 3600000
93 1
        self.secure_channel_timeout = self.default_timeout
94 1
        self.session_timeout = self.default_timeout
95 1
        self._policy_ids = []
96 1
        self.uaclient = UaClient(timeout)
97 1
        self.user_certificate = None
98 1
        self.user_private_key = None
99 1
        self._session_counter = 1
100 1
        self.keepalive = None
101
102 1
    @staticmethod
103
    def find_endpoint(endpoints, security_mode, policy_uri):
104
        """
105
        Find endpoint with required security mode and policy URI
106
        """
107 1
        for ep in endpoints:
108 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
109
                    ep.SecurityMode == security_mode and
110
                    ep.SecurityPolicyUri == policy_uri):
111 1
                return ep
112
        raise ua.UaError("No matching endpoints: {}, {}".format(
113
                         security_mode, policy_uri))
114
115 1
    def set_security_string(self, string):
116
        """
117
        Set SecureConnection mode. String format:
118
        Policy,Mode,certificate,private_key[,server_private_key]
119
        where Policy is Basic128Rsa15 or Basic256,
120
            Mode is Sign or SignAndEncrypt
121
            certificate, private_key and server_private_key are
122
                paths to .pem or .der files
123
        Call this before connect()
124
        """
125
        if not string:
126
            return
127
        parts = string.split(',')
128
        if len(parts) < 4:
129
            raise ua.UaError('Wrong format: `{}`, expected at least 4 comma-separated values'.format(string))
130
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
131
        mode = getattr(ua.MessageSecurityMode, parts[1])
132
        return self.set_security(policy_class, parts[2], parts[3],
133
                                 parts[4] if len(parts) >= 5 else None, mode)
134
135 1
    def set_security(self, policy, certificate_path, private_key_path,
136
                     server_certificate_path=None,
137
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
138
        """
139
        Set SecureConnection mode.
140
        Call this before connect()
141
        """
142
        if server_certificate_path is None:
143
            # load certificate from server's list of endpoints
144
            endpoints = self.connect_and_get_server_endpoints()
145
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
146
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
147
        else:
148
            server_cert = uacrypto.load_certificate(server_certificate_path)
149
        cert = uacrypto.load_certificate(certificate_path)
150
        pk = uacrypto.load_private_key(private_key_path)
151
        self.security_policy = policy(server_cert, cert, pk, mode)
152
        self.uaclient.set_security(self.security_policy)
153
154 1
    def load_client_certificate(self, path):
155
        """
156
        load our certificate from file, either pem or der
157
        """
158
        self.user_certificate = uacrypto.load_certificate(path)
159
160 1
    def load_private_key(self, path):
161
        """
162
        Load user private key. This is used for authenticating using certificate
163
        """
164
        self.user_private_key = uacrypto.load_private_key(path)
165
166 1
    def connect_and_get_server_endpoints(self):
167
        """
168
        Connect, ask server for endpoints, and disconnect
169
        """
170
        self.connect_socket()
171
        self.send_hello()
172
        self.open_secure_channel()
173
        endpoints = self.get_endpoints()
174
        self.close_secure_channel()
175
        self.disconnect_socket()
176
        return endpoints
177
178 1
    def connect_and_find_servers(self):
179
        """
180
        Connect, ask server for a list of known servers, and disconnect
181
        """
182
        self.connect_socket()
183
        self.send_hello()
184
        self.open_secure_channel()  # spec says it should not be necessary to open channel
185
        servers = self.find_servers()
186
        self.close_secure_channel()
187
        self.disconnect_socket()
188
        return servers
189
190 1
    def connect_and_find_servers_on_network(self):
191
        """
192
        Connect, ask server for a list of known servers on network, and disconnect
193
        """
194
        self.connect_socket()
195
        self.send_hello()
196
        self.open_secure_channel()
197
        servers = self.find_servers_on_network()
198
        self.close_secure_channel()
199
        self.disconnect_socket()
200
        return servers
201
202 1
    def connect(self):
203
        """
204
        High level method
205
        Connect, create and activate session
206
        """
207 1
        self.connect_socket()
208 1
        self.send_hello()
209 1
        self.open_secure_channel()
210 1
        self.create_session()
211 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
212
213 1
    def disconnect(self):
214
        """
215
        High level method
216
        Close session, secure channel and socket
217
        """
218 1
        self.close_session()
219 1
        self.close_secure_channel()
220 1
        self.disconnect_socket()
221
222 1
    def connect_socket(self):
223
        """
224
        connect to socket defined in url
225
        """
226 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
227
228 1
    def disconnect_socket(self):
229 1
        self.uaclient.disconnect_socket()
230
231 1
    def send_hello(self):
232
        """
233
        Send OPC-UA hello to server
234
        """
235 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
236
        # FIXME check ack
237
238 1
    def open_secure_channel(self, renew=False):
239
        """
240
        Open secure channel, if renew is True, renew channel
241
        """
242 1
        params = ua.OpenSecureChannelParameters()
243 1
        params.ClientProtocolVersion = 0
244 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
245 1
        if renew:
246
            params.RequestType = ua.SecurityTokenRequestType.Renew
247 1
        params.SecurityMode = self.security_policy.Mode
248 1
        params.RequestedLifetime = self.secure_channel_timeout
249 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
250 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
251 1
        result = self.uaclient.open_secure_channel(params)
252 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
253 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
254
255 1
    def close_secure_channel(self):
256 1
        return self.uaclient.close_secure_channel()
257
258 1
    def get_endpoints(self):
259 1
        params = ua.GetEndpointsParameters()
260 1
        params.EndpointUrl = self.server_url.geturl()
261 1
        return self.uaclient.get_endpoints(params)
262
263 1
    def register_server(self, server, discovery_configuration=None):
264
        """
265
        register a server to discovery server
266
        if discovery_configuration is provided, the newer register_server2 service call is used
267
        """
268 1
        serv = ua.RegisteredServer()
269 1
        serv.ServerUri = server.application_uri
270 1
        serv.ProductUri = server.product_uri
271 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
272 1
        serv.ServerType = server.application_type
273 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
274 1
        serv.IsOnline = True
275 1
        if discovery_configuration:
276
            params = ua.RegisterServer2Parameters()
277
            params.Server = serv
278
            params.DiscoveryConfiguration = discovery_configuration
279
            return self.uaclient.register_server2(params)
280
        else:
281 1
            return self.uaclient.register_server(serv)
282
283 1
    def find_servers(self, uris=None):
284
        """
285
        send a FindServer request to the server. The answer should be a list of
286
        servers the server knows about
287
        A list of uris can be provided, only server having matching uris will be returned
288
        """
289 1
        if uris is None:
290 1
            uris = []
291 1
        params = ua.FindServersParameters()
292 1
        params.EndpointUrl = self.server_url.geturl()
293 1
        params.ServerUris = uris
294 1
        return self.uaclient.find_servers(params)
295
296 1
    def find_servers_on_network(self):
297
        params = ua.FindServersOnNetworkParameters()
298
        return self.uaclient.find_servers_on_network(params)
299
300 1
    def create_session(self):
301
        """
302
        send a CreateSessionRequest to server with reasonable parameters.
303
        If you want o modify settings look at code of this methods
304
        and make your own
305
        """
306 1
        desc = ua.ApplicationDescription()
307 1
        desc.ApplicationUri = self.application_uri
308 1
        desc.ProductUri = self.product_uri
309 1
        desc.ApplicationName = ua.LocalizedText(self.name)
310 1
        desc.ApplicationType = ua.ApplicationType.Client
311
312 1
        params = ua.CreateSessionParameters()
313 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
314 1
        params.ClientNonce = nonce
315 1
        params.ClientCertificate = self.security_policy.client_certificate
316 1
        params.ClientDescription = desc
317 1
        params.EndpointUrl = self.server_url.geturl()
318 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
319 1
        params.RequestedSessionTimeout = 3600000
320 1
        params.MaxResponseMessageSize = 0  # means no max size
321 1
        response = self.uaclient.create_session(params)
322 1
        if self.security_policy.client_certificate is None:
323 1
            data = nonce
324
        else:
325
            data = self.security_policy.client_certificate + nonce
326 1
        self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
327 1
        self._server_nonce = response.ServerNonce
328 1
        if not self.security_policy.server_certificate:
329 1
            self.security_policy.server_certificate = response.ServerCertificate
330
        elif self.security_policy.server_certificate != response.ServerCertificate:
331
            raise ua.UaError("Server certificate mismatch")
332
        # remember PolicyId's: we will use them in activate_session()
333 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
334 1
        self._policy_ids = ep.UserIdentityTokens
335 1
        self.session_timeout = response.RevisedSessionTimeout
336 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
337 1
        self.keepalive.start()
338 1
        return response
339
340 1
    def server_policy_id(self, token_type, default):
341
        """
342
        Find PolicyId of server's UserTokenPolicy by token_type.
343
        Return default if there's no matching UserTokenPolicy.
344
        """
345 1
        for policy in self._policy_ids:
346 1
            if policy.TokenType == token_type:
347 1
                return policy.PolicyId
348
        return default
349
350 1
    def server_policy_uri(self, token_type):
351
        """
352
        Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
353
        If SecurityPolicyUri is empty, use default SecurityPolicyUri
354
        of the endpoint
355
        """
356 1
        for policy in self._policy_ids:
357 1
            if policy.TokenType == token_type:
358 1
                if policy.SecurityPolicyUri:
359
                    return policy.SecurityPolicyUri
360
                else:   # empty URI means "use this endpoint's policy URI"
361 1
                    return self.security_policy.URI
362
        return self.security_policy.URI
363
364 1
    def activate_session(self, username=None, password=None, certificate=None):
365
        """
366
        Activate session using either username and password or private_key
367
        """
368 1
        params = ua.ActivateSessionParameters()
369 1
        challenge = b""
370 1
        if self.security_policy.server_certificate is not None:
371
            challenge += self.security_policy.server_certificate
372 1
        if self._server_nonce is not None:
373 1
            challenge += self._server_nonce
374 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
375 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
376 1
        params.LocaleIds.append("en")
377 1
        if not username and not certificate:
378 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
379 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
380 1
        elif certificate:
381
            params.UserIdentityToken = ua.X509IdentityToken()
382
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
383
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
384
            # specs part 4, 5.6.3.1: the data to sign is created by appending
385
            # the last serverNonce to the serverCertificate
386
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
387
            params.UserTokenSignature = ua.SignatureData()
388
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
389
            params.UserTokenSignature.Signature = sig
390
        else:
391 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
392 1
            params.UserIdentityToken.UserName = username
393 1
            policy_uri = self.server_policy_uri(ua.UserTokenType.UserName)
394 1
            if not policy_uri or policy_uri == security_policies.POLICY_NONE_URI:
395
                # see specs part 4, 7.36.3: if the token is NOT encrypted,
396
                # then the password only contains UTF-8 encoded password
397
                # and EncryptionAlgorithm is null
398 1
                if self.server_url.password:
399
                    self.logger.warning("Sending plain-text password")
400
                    params.UserIdentityToken.Password = password
401 1
                params.UserIdentityToken.EncryptionAlgorithm = ''
402
            elif self.server_url.password:
403
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
404
                # see specs part 4, 7.36.3: if the token is encrypted, password
405
                # shall be converted to UTF-8 and serialized with server nonce
406
                passwd = bytes(password, "utf8")
407
                if self._server_nonce is not None:
408
                    passwd += self._server_nonce
409
                etoken = ua.pack_bytes(passwd)
410
                data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
411
                params.UserIdentityToken.Password = data
412
                params.UserIdentityToken.EncryptionAlgorithm = uri
413 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
414 1
        return self.uaclient.activate_session(params)
415
416 1
    def close_session(self):
417
        """
418
        Close session
419
        """
420 1
        if self.keepalive:
421 1
            self.keepalive.stop()
422 1
        return self.uaclient.close_session(True)
423
424 1
    def get_root_node(self):
425 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
426
427 1
    def get_objects_node(self):
428 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
429
430 1
    def get_server_node(self):
431 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
432
433 1
    def get_node(self, nodeid):
434
        """
435
        Get node using NodeId object or a string representing a NodeId
436
        """
437 1
        return Node(self.uaclient, nodeid)
438
439 1
    def create_subscription(self, period, handler):
440
        """
441
        Create a subscription.
442
        returns a Subscription object which allow
443
        to subscribe to events or data on server
444
        handler argument is a class with data_change and/or event methods.
445
        These methods will be called when notfication from server are received.
446
        See example-client.py.
447
        Do not do expensive/slow or network operation from these methods
448
        since they are called directly from receiving thread. This is a design choice,
449
        start another thread if you need to do such a thing.
450
        """
451 1
        params = ua.CreateSubscriptionParameters()
452 1
        params.RequestedPublishingInterval = period
453 1
        params.RequestedLifetimeCount = 3000
454 1
        params.RequestedMaxKeepAliveCount = 10000
455 1
        params.MaxNotificationsPerPublish = 10000
456 1
        params.PublishingEnabled = True
457 1
        params.Priority = 0
458 1
        return Subscription(self.uaclient, params, handler)
459
460 1
    def get_namespace_array(self):
461 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
462 1
        return ns_node.get_value()
463
464 1
    def get_namespace_index(self, uri):
465 1
        uries = self.get_namespace_array()
466 1
        return uries.index(uri)
467
468 1
    def delete_nodes(self, nodes, recursive=False):
469 1
        return delete_nodes(self.uaclient, nodes, recursive)
470
            
471