Passed
Push — dev ( 3fa5c0...bfa3a0 )
by Olivier
02:27
created

opcua.client.Client.delete_nodes()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2
Metric Value
cc 2
dl 0
loc 8
ccs 8
cts 8
cp 1
crap 2
rs 9.4285
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import ua
10 1
from opcua.client.ua_client import UaClient
11 1
from opcua.common.node import Node
12 1
from opcua.common.subscription import Subscription
13 1
from opcua.common import utils
14 1
from opcua.crypto import security_policies
15 1
use_crypto = True
16 1
try:
17 1
    from opcua.crypto import uacrypto
18 1
except ImportError:
19 1
    print("cryptography is not installed, use of crypto disabled")
20 1
    use_crypto = False
21
22
23 1
class KeepAlive(Thread):
24
25
    """
26
    Used by Client to keep session opened.
27
    OPCUA defines timeout both for sessions and secure channel
28
    """
29
30 1
    def __init__(self, client, timeout):
31 1
        Thread.__init__(self)
32 1
        self.logger = logging.getLogger(__name__)
33 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
34
            timeout = 360000
35 1
        self.timeout = timeout
36 1
        self.client = client
37 1
        self._dostop = False
38 1
        self._cond = Condition()
39
40 1
    def run(self):
41 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
42 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
43 1
        while not self._dostop:
44 1
            with self._cond:
45 1
                self._cond.wait(self.timeout / 1000)
46 1
            if self._dostop:
47 1
                break
48
            self.logger.debug("renewing channel")
49
            self.client.open_secure_channel(renew=True)
50
            val = server_state.get_value()
51
            self.logger.debug("server state is: %s ", val)
52 1
        self.logger.debug("keepalive thread has stopped")
53
54 1
    def stop(self):
55 1
        self.logger.debug("stoping keepalive thread")
56 1
        self._dostop = True
57 1
        with self._cond:
58 1
            self._cond.notify_all()
59
60
61 1
class Client(object):
62
63
    """
64
    High level client to connect to an OPC-UA server.
65
    This class makes it easy to connect and browse address space.
66
    It attemps to expose as much functionality as possible
67
    but if you want to do to special things you will probably need
68
    to work with the UaClient object, available as self.uaclient
69
    which offers a raw OPC-UA interface.
70
    """
71
72 1
    def __init__(self, url, timeout=1):
73
        """
74
        used url argument to connect to server.
75
        if you are unsure of url, write at least hostname and port
76
        and call get_endpoints
77
        timeout is the timeout to get an answer for requests to server
78
        public member of this call are available to be set by API users
79
80
        """
81 1
        self.logger = logging.getLogger(__name__)
82 1
        self.server_url = urlparse(url)
83 1
        self.name = "Pure Python Client"
84 1
        self.description = self.name
85 1
        self.application_uri = "urn:freeopcua:client"
86 1
        self.product_uri = "urn:freeopcua.github.no:client"
87 1
        self.security_policy = ua.SecurityPolicy()
88 1
        self.secure_channel_id = None
89 1
        self.default_timeout = 3600000
90 1
        self.secure_channel_timeout = self.default_timeout
91 1
        self.session_timeout = self.default_timeout
92 1
        self._policy_ids = []
93 1
        self.uaclient = UaClient(timeout)
94 1
        self.user_certificate = None
95 1
        self.user_private_key = None
96 1
        self._session_counter = 1
97 1
        self.keepalive = None
98
99 1
    @staticmethod
100
    def find_endpoint(endpoints, security_mode, policy_uri):
101
        """
102
        Find endpoint with required security mode and policy URI
103
        """
104 1
        for ep in endpoints:
105 1
            if (ep.EndpointUrl.startswith(ua.OPC_TCP_SCHEME) and
106
                    ep.SecurityMode == security_mode and
107
                    ep.SecurityPolicyUri == policy_uri):
108 1
                return ep
109
        raise ValueError("No matching endpoints: {}, {}".format(
110
                         security_mode, policy_uri))
111
112 1
    def set_security_string(self, string):
113
        """
114
        Set SecureConnection mode. String format:
115
        Policy,Mode,certificate,private_key[,server_private_key]
116
        where Policy is Basic128Rsa15 or Basic256,
117
            Mode is Sign or SignAndEncrypt
118
            certificate, private_key and server_private_key are
119
                paths to .pem or .der files
120
        Call this before connect()
121
        """
122
        if not string:
123
            return
124
        parts = string.split(',')
125
        if len(parts) < 4:
126
            raise UaError('Wrong format: `{}`, expected at least 4 '
127
                    'comma-separated values'.format(string))
128
        policy_class = getattr(security_policies, 'SecurityPolicy' + parts[0])
129
        mode = getattr(ua.MessageSecurityMode, parts[1])
130
        return self.set_security(policy_class, parts[2], parts[3],
131
                                 parts[4] if len(parts) >= 5 else None, mode)
132
133 1
    def set_security(self, policy, certificate_path, private_key_path,
134
                     server_certificate_path=None,
135
                     mode=ua.MessageSecurityMode.SignAndEncrypt):
136
        """
137
        Set SecureConnection mode.
138
        Call this before connect()
139
        """
140
        if server_certificate_path is None:
141
            # load certificate from server's list of endpoints
142
            endpoints = self.connect_and_get_server_endpoints()
143
            endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
144
            server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
145
        else:
146
            server_cert = uacrypto.load_certificate(server_certificate_path)
147
        cert = uacrypto.load_certificate(certificate_path)
148
        pk = uacrypto.load_private_key(private_key_path)
149
        self.security_policy = policy(server_cert, cert, pk, mode)
150
        self.uaclient.set_security(self.security_policy)
151
152 1
    def load_client_certificate(self, path):
153
        """
154
        load our certificate from file, either pem or der
155
        """
156
        self.user_certificate = uacrypto.load_certificate(path)
157
158 1
    def load_private_key(self, path):
159
        self.user_private_key = uacrypto.load_private_key(path)
160
161 1
    def connect_and_get_server_endpoints(self):
162
        """
163
        Connect, ask server for endpoints, and disconnect
164
        """
165
        self.connect_socket()
166
        self.send_hello()
167
        self.open_secure_channel()
168
        endpoints = self.get_endpoints()
169
        self.close_secure_channel()
170
        self.disconnect_socket()
171
        return endpoints
172
173 1
    def connect_and_find_servers(self):
174
        """
175
        Connect, ask server for a list of known servers, and disconnect
176
        """
177
        self.connect_socket()
178
        self.send_hello()
179
        self.open_secure_channel()  # spec says it should not be necessary to open channel
180
        servers = self.find_servers()
181
        self.close_secure_channel()
182
        self.disconnect_socket()
183
        return servers
184
185 1
    def connect_and_find_servers_on_network(self):
186
        """
187
        Connect, ask server for a list of known servers on network, and disconnect
188
        """
189
        self.connect_socket()
190
        self.send_hello()
191
        self.open_secure_channel()
192
        servers = self.find_servers_on_network()
193
        self.close_secure_channel()
194
        self.disconnect_socket()
195
        return servers
196
197 1
    def connect(self):
198
        """
199
        High level method
200
        Connect, create and activate session
201
        """
202 1
        self.connect_socket()
203 1
        self.send_hello()
204 1
        self.open_secure_channel()
205 1
        self.create_session()
206 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.user_certificate)
207
208 1
    def disconnect(self):
209
        """
210
        High level method
211
        Close session, secure channel and socket
212
        """
213 1
        self.close_session()
214 1
        self.close_secure_channel()
215 1
        self.disconnect_socket()
216
217 1
    def connect_socket(self):
218
        """
219
        connect to socket defined in url
220
        """
221 1
        self.uaclient.connect_socket(self.server_url.hostname, self.server_url.port)
222
223 1
    def disconnect_socket(self):
224 1
        self.uaclient.disconnect_socket()
225
226 1
    def send_hello(self):
227
        """
228
        Send OPC-UA hello to server
229
        """
230 1
        ack = self.uaclient.send_hello(self.server_url.geturl())
231
        # FIXME check ack
232
233 1
    def open_secure_channel(self, renew=False):
234
        """
235
        Open secure channel, if renew is True, renew channel
236
        """
237 1
        params = ua.OpenSecureChannelParameters()
238 1
        params.ClientProtocolVersion = 0
239 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
240 1
        if renew:
241
            params.RequestType = ua.SecurityTokenRequestType.Renew
242 1
        params.SecurityMode = self.security_policy.Mode
243 1
        params.RequestedLifetime = self.secure_channel_timeout
244 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
245 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
246 1
        result = self.uaclient.open_secure_channel(params)
247 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
248 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
249
250 1
    def close_secure_channel(self):
251 1
        return self.uaclient.close_secure_channel()
252
253 1
    def get_endpoints(self):
254 1
        params = ua.GetEndpointsParameters()
255 1
        params.EndpointUrl = self.server_url.geturl()
256 1
        return self.uaclient.get_endpoints(params)
257
258 1
    def register_server(self, server, discovery_configuration=None):
259
        """
260
        register a server to discovery server
261
        if discovery_configuration is provided, the newer register_server2 service call is used
262
        """
263 1
        serv = ua.RegisteredServer()
264 1
        serv.ServerUri = server.application_uri
265 1
        serv.ProductUri = server.product_uri
266 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
267 1
        serv.ServerType = server.application_type
268 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
269 1
        serv.IsOnline = True
270 1
        if discovery_configuration:
271
            params = ua.RegisterServer2Parameters()
272
            params.Server = serv
273
            params.DiscoveryConfiguration = discovery_configuration
274
            return self.uaclient.register_server2(params)
275
        else:
276 1
            return self.uaclient.register_server(serv)
277
278 1
    def find_servers(self, uris=None):
279
        """
280
        send a FindServer request to the server. The answer should be a list of
281
        servers the server knows about
282
        A list of uris can be provided, only server having matching uris will be returned
283
        """
284 1
        if uris is None:
285 1
            uris = []
286 1
        params = ua.FindServersParameters()
287 1
        params.EndpointUrl = self.server_url.geturl()
288 1
        params.ServerUris = uris
289 1
        return self.uaclient.find_servers(params)
290
291 1
    def find_servers_on_network(self):
292
        params = ua.FindServersOnNetworkParameters()
293
        return self.uaclient.find_servers_on_network(params)
294
295 1
    def create_session(self):
296 1
        desc = ua.ApplicationDescription()
297 1
        desc.ApplicationUri = self.application_uri
298 1
        desc.ProductUri = self.product_uri
299 1
        desc.ApplicationName = ua.LocalizedText(self.name)
300 1
        desc.ApplicationType = ua.ApplicationType.Client
301
302 1
        params = ua.CreateSessionParameters()
303 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
304 1
        params.ClientNonce = nonce
305 1
        params.ClientCertificate = self.security_policy.client_certificate
306 1
        params.ClientDescription = desc
307 1
        params.EndpointUrl = self.server_url.geturl()
308 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
309 1
        params.RequestedSessionTimeout = 3600000
310 1
        params.MaxResponseMessageSize = 0  # means no max size
311 1
        response = self.uaclient.create_session(params)
312 1
        self.security_policy.asymmetric_cryptography.verify(self.security_policy.client_certificate + nonce, response.ServerSignature.Signature)
313 1
        self._server_nonce = response.ServerNonce
314 1
        if not self.security_policy.server_certificate:
315 1
            self.security_policy.server_certificate = response.ServerCertificate
316
        elif self.security_policy.server_certificate != response.ServerCertificate:
317
            raise UaError("Server certificate mismatch")
318
        # remember PolicyId's: we will use them in activate_session()
319 1
        ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
320 1
        self._policy_ids = ep.UserIdentityTokens
321 1
        self.session_timeout = response.RevisedSessionTimeout
322 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
323 1
        self.keepalive.start()
324 1
        return response
325
326 1
    def server_policy_id(self, token_type, default):
327
        """
328
        Find PolicyId of server's UserTokenPolicy by token_type.
329
        Return default if there's no matching UserTokenPolicy.
330
        """
331 1
        for policy in self._policy_ids:
332 1
            if policy.TokenType == token_type:
333 1
                return policy.PolicyId
334 1
        return default
335
336 1
    def activate_session(self, username=None, password=None, certificate=None):
337
        """
338
        Activate session using either username and password or private_key
339
        """
340 1
        params = ua.ActivateSessionParameters()
341 1
        challenge = self.security_policy.server_certificate + self._server_nonce
342 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
343 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
344 1
        params.LocaleIds.append("en")
345 1
        if not username and not certificate:
346 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
347 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
348 1
        elif certificate:
349
            params.UserIdentityToken = ua.X509IdentityToken()
350
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
351
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
352
            # specs part 4, 5.6.3.1: the data to sign is created by appending
353
            # the last serverNonce to the serverCertificate
354
            sig = uacrypto.sign_sha1(self.user_private_key, challenge)
355
            params.UserTokenSignature = ua.SignatureData()
356
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
357
            params.UserTokenSignature.Signature = sig
358
        else:
359 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
360 1
            params.UserIdentityToken.UserName = username
361 1
            if self.server_url.password:
362
                pubkey = uacrypto.x509_from_der(self.security_policy.server_certificate).public_key()
363
                # see specs part 4, 7.36.3: if the token is encrypted, password
364
                # shall be converted to UTF-8 and serialized with server nonce
365
                etoken = ua.pack_bytes(bytes(password, "utf8") + self._server_nonce)
366
                #data = uacrypto.encrypt_basic256(pubkey, etoken)
367
                data = uacrypto.encrypt_rsa_oaep(pubkey, etoken)
368
                params.UserIdentityToken.Password = data
369 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
370 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
371 1
        return self.uaclient.activate_session(params)
372
373 1
    def close_session(self):
374
        """
375
        Close session
376
        """
377 1
        if self.keepalive:
378 1
            self.keepalive.stop()
379 1
        return self.uaclient.close_session(True)
380
381 1
    def get_root_node(self):
382 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
383
384 1
    def get_objects_node(self):
385 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
386
387 1
    def get_server_node(self):
388 1
        return self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server))
389
390 1
    def get_node(self, nodeid):
391
        """
392
        Get node using NodeId object or a string representing a NodeId
393
        """
394 1
        return Node(self.uaclient, nodeid)
395
396 1
    def create_subscription(self, period, handler):
397
        """
398
        Create a subscription.
399
        returns a Subscription object which allow
400
        to subscribe to events or data on server
401
        handler argument is a class with data_change and/or event methods.
402
        These methods will be called when notfication from server are received.
403
        See example-client.py.
404
        Do not do expensive/slow or network operation from these methods
405
        since they are called directly from receiving thread. This is a design choice,
406
        start another thread if you need to do such a thing.
407
        """
408 1
        params = ua.CreateSubscriptionParameters()
409 1
        params.RequestedPublishingInterval = period
410 1
        params.RequestedLifetimeCount = 3000
411 1
        params.RequestedMaxKeepAliveCount = 10000
412 1
        params.MaxNotificationsPerPublish = 10000
413 1
        params.PublishingEnabled = True
414 1
        params.Priority = 0
415 1
        return Subscription(self.uaclient, params, handler)
416
417 1
    def get_namespace_array(self):
418 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
419 1
        return ns_node.get_value()
420
421 1
    def get_namespace_index(self, uri):
422 1
        uries = self.get_namespace_array()
423 1
        return uries.index(uri)
424
425 1
    def delete_nodes(self, nodes):
426 1
        nodestodelete = []
427 1
        for node in nodes:
428 1
            it = ua.DeleteNodesItem()
429 1
            it.NodeId = node.nodeid
430 1
            it.DeleteTargetReferences = True
431 1
            nodestodelete.append(it)
432 1
        return self.uaclient.delete_nodes(nodestodelete)
433
            
434