Completed
Push — master ( 5b15af...b1f4f6 )
by Olivier
02:33
created

opcua.Client.disconnect()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1
Metric Value
dl 0
loc 8
ccs 4
cts 4
cp 1
rs 9.4286
cc 1
crap 1
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
use_crypto = True
14 1
try:
15 1
    from opcua import uacrypto
16 1
except:
17 1
    print("cryptography is not installed, use of crypto disabled")
18 1
    use_crypto = False
19
20
21 1
class KeepAlive(Thread):
22
23
    """
24
    Used by Client to keep session opened.
25
    OPCUA defines timeout both for sessions and secure channel
26
    """
27
28 1
    def __init__(self, client, timeout):
29 1
        Thread.__init__(self)
30 1
        self.logger = logging.getLogger(__name__)
31 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
32
            timeout = 360000
33 1
        self.timeout = timeout
34 1
        self.client = client
35 1
        self._dostop = False
36 1
        self._cond = Condition()
37
38 1
    def run(self):
39 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
40 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
41 1
        while not self._dostop:
42 1
            with self._cond:
43 1
                self._cond.wait(self.timeout / 1000)
44 1
            if self._dostop:
45 1
                break
46
            self.logger.debug("renewing channel")
47
            self.client.open_secure_channel(renew=True)
48
            val = server_state.get_value()
49
            self.logger.debug("server state is: %s ", val)
50 1
        self.logger.debug("keepalive thread has stopped")
51
52 1
    def stop(self):
53 1
        self.logger.debug("stoping keepalive thread")
54 1
        with self._cond:
55 1
            self._cond.notify_all()
56 1
        self._dostop = True
57
58
59 1
class Client(object):
60
61
    """
62
    High level client to connect to an OPC-UA server.
63
    This class makes it easy to connect and browse address space.
64
    It attemps to expose as much functionality as possible
65
    but if you want to do to special things you will probably need
66
    to work with the BinaryClient object, available as self.bclient
67
    which offers a raw OPC-UA interface.
68
    """
69
70 1
    def __init__(self, url, timeout=1, security_policy=ua.SecurityPolicy()):
71
        """
72
        used url argument to connect to server.
73
        if you are unsure of url, write at least hostname and port
74
        and call get_endpoints
75
        timeout is the timeout to get an answer for requests to server
76
        public member of this call are available to be set by API users
77
78
        """
79 1
        self.logger = logging.getLogger(__name__)
80 1
        self.server_url = urlparse(url)
81 1
        self.name = "Pure Python Client"
82 1
        self.description = self.name
83 1
        self.application_uri = "urn:freeopcua:client"
84 1
        self.product_uri = "urn:freeopcua.github.no:client"
85 1
        self.security_policy = security_policy
86 1
        self.secure_channel_id = None
87 1
        self.default_timeout = 3600000
88 1
        self.secure_channel_timeout = self.default_timeout
89 1
        self.session_timeout = self.default_timeout
90 1
        self._policy_ids = []
91 1
        self.bclient = BinaryClient(timeout, security_policy=security_policy)
92 1
        self.server_certificate = None
93 1
        self.client_certificate = None
94 1
        self.private_key = None 
95 1
        self._session_counter = 1
96 1
        self.keepalive = None
97
98 1
    def load_client_certificate(self, path):
99
        """
100
        load our certificate from file, either pem or der
101
        """
102
        self.client_certificate = uacrypto.load_certificate(path)
103
104 1
    def load_private_key(self, path):
105
        self.private_key = uacrypto.load_private_key(path)
106
107 1
    def connect_and_get_server_endpoints(self):
108
        """
109
        Connect, ask server for endpoints, and disconnect
110
        """
111
        self.connect_socket()
112
        self.send_hello()
113
        self.open_secure_channel()
114
        endpoints = self.get_endpoints()
115
        self.close_secure_channel()
116
        self.disconnect_socket()
117
        return endpoints
118
119 1
    def connect_and_find_servers(self):
120
        """
121
        Connect, ask server for a list of known servers, and disconnect
122
        """
123
        self.connect_socket()
124
        self.send_hello()
125
        self.open_secure_channel()  # spec says it should not be necessary to open channel
126
        servers = self.find_servers()
127
        self.close_secure_channel()
128
        self.disconnect_socket()
129
        return servers
130
131 1
    def connect_and_find_servers_on_network(self):
132
        """
133
        Connect, ask server for a list of known servers on network, and disconnect
134
        """
135
        self.connect_socket()
136
        self.send_hello()
137
        self.open_secure_channel()
138
        servers = self.find_servers_on_network()
139
        self.close_secure_channel()
140
        self.disconnect_socket()
141
        return servers
142
143 1
    def connect(self):
144
        """
145
        High level method
146
        Connect, create and activate session
147
        """
148 1
        self.connect_socket()
149 1
        self.send_hello()
150 1
        self.open_secure_channel()
151 1
        self.create_session()
152 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
153
154 1
    def disconnect(self):
155
        """
156
        High level method
157
        Close session, secure channel and socket
158
        """
159 1
        self.close_session()
160 1
        self.close_secure_channel()
161 1
        self.disconnect_socket()
162
163 1
    def connect_socket(self):
164
        """
165
        connect to socket defined in url
166
        """
167 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
168
169 1
    def disconnect_socket(self):
170 1
        self.bclient.disconnect_socket()
171
172 1
    def send_hello(self):
173
        """
174
        Send OPC-UA hello to server
175
        """
176 1
        ack = self.bclient.send_hello(self.server_url.geturl())
177
        # FIXME check ack
178
179 1
    def open_secure_channel(self, renew=False):
180
        """
181
        Open secure channel, if renew is True, renew channel
182
        """
183 1
        params = ua.OpenSecureChannelParameters()
184 1
        params.ClientProtocolVersion = 0
185 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
186 1
        if renew:
187
            params.RequestType = ua.SecurityTokenRequestType.Renew
188 1
        params.SecurityMode = self.security_policy.Mode
189 1
        params.RequestedLifetime = self.secure_channel_timeout
190 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
191 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
192 1
        result = self.bclient.open_secure_channel(params)
193 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
194 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
195
196 1
    def close_secure_channel(self):
197 1
        return self.bclient.close_secure_channel()
198
199 1
    def get_endpoints(self):
200 1
        params = ua.GetEndpointsParameters()
201 1
        params.EndpointUrl = self.server_url.geturl()
202 1
        return self.bclient.get_endpoints(params)
203
204 1
    def register_server(self, server, discovery_configuration=None):
205
        """
206
        register a server to discovery server
207
        if discovery_configuration is provided, the newer register_server2 service call is used
208
        """
209 1
        serv = ua.RegisteredServer()
210 1
        serv.ServerUri = server.application_uri
211 1
        serv.ProductUri = server.product_uri
212 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
213 1
        serv.ServerType = server.application_type
214 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
215 1
        serv.IsOnline = True
216 1
        if discovery_configuration:
217
            params = ua.RegisterServer2Parameters()
218
            params.Server = serv
219
            params.DiscoveryConfiguration = discovery_configuration
220
            return self.bclient.register_server2(params)
221
        else:
222 1
            return self.bclient.register_server(serv)
223
224 1
    def find_servers(self, uris=None):
225
        """
226
        send a FindServer request to the server. The answer should be a list of
227
        servers the server knows about
228
        A list of uris can be provided, only server having matching uris will be returned
229
        """
230 1
        if uris is None:
231 1
            uris = []
232 1
        params = ua.FindServersParameters()
233 1
        params.EndpointUrl = self.server_url.geturl()
234 1
        params.ServerUris = uris 
235 1
        return self.bclient.find_servers(params)
236
237 1
    def find_servers_on_network(self):
238
        params = ua.FindServersOnNetworkParameters()
239
        return self.bclient.find_servers_on_network(params)
240
241 1
    def create_session(self):
242 1
        desc = ua.ApplicationDescription()
243 1
        desc.ApplicationUri = self.application_uri
244 1
        desc.ProductUri = self.product_uri
245 1
        desc.ApplicationName = ua.LocalizedText(self.name)
246 1
        desc.ApplicationType = ua.ApplicationType.Client
247
248 1
        params = ua.CreateSessionParameters()
249 1
        nonce = utils.create_nonce(32)  # at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
250 1
        params.ClientNonce = nonce
251 1
        params.ClientCertificate = self.security_policy.client_certificate
252 1
        params.ClientDescription = desc
253 1
        params.EndpointUrl = self.server_url.geturl()
254 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
255 1
        params.RequestedSessionTimeout = 3600000
256 1
        params.MaxResponseMessageSize = 0  # means no max size
257 1
        response = self.bclient.create_session(params)
258 1
        self.security_policy.asymmetric_cryptography.verify(self.security_policy.client_certificate + nonce, response.ServerSignature.Signature)
259 1
        self._server_nonce = response.ServerNonce
260 1
        self.server_certificate = response.ServerCertificate
261 1
        for ep in response.ServerEndpoints:
262 1
            if urlparse(ep.EndpointUrl).scheme == self.server_url.scheme and ep.SecurityMode == self.security_policy.Mode:
263
                # remember PolicyId's: we will use them in activate_session()
264 1
                self._policy_ids = ep.UserIdentityTokens
265 1
        self.session_timeout = response.RevisedSessionTimeout
266 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
267 1
        self.keepalive.start()
268 1
        return response
269
270 1
    def server_policy_id(self, token_type, default):
271
        """
272
        Find PolicyId of server's UserTokenPolicy by token_type.
273
        Return default if there's no matching UserTokenPolicy.
274
        """
275 1
        for policy in self._policy_ids:
276 1
            if policy.TokenType == token_type:
277 1
                return policy.PolicyId
278 1
        return default
279
280 1
    def activate_session(self, username=None, password=None, certificate=None):
281
        """
282
        Activate session using either username and password or private_key
283
        """
284 1
        params = ua.ActivateSessionParameters()
285 1
        challenge = self.security_policy.server_certificate + self._server_nonce
286 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
287 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
288 1
        params.LocaleIds.append("en")
289 1
        if not username and not certificate:
290 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
291 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
292 1
        elif certificate:
293
            params.UserIdentityToken = ua.X509IdentityToken()
294
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
295
            params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(self.client_certificate)
296
            sig = uacrypto.sign_sha1(self.private_key, certificate)
297
            params.UserTokenSignature = ua.SignatureData()
298
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
299
            params.UserTokenSignature.Signature = sig
300
        else:
301 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
302 1
            params.UserIdentityToken.UserName = username 
303 1
            if self.server_url.password:
304
                pubkey = self.server_certificate.publick_key()
305
                data = uacrypto.encrypt_basic256(pubkey, bytes(password, "utf8"))
306
                params.UserIdentityToken.Password = data
307 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
308 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
309 1
        return self.bclient.activate_session(params)
310
311 1
    def close_session(self):
312
        """
313
        Close session
314
        """
315 1
        if self.keepalive:
316 1
            self.keepalive.stop()
317 1
        return self.bclient.close_session(True)
318
319 1
    def get_root_node(self):
320 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
321
322 1
    def get_objects_node(self):
323 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
324
325 1
    def get_server_node(self):
326 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
327
328 1
    def get_node(self, nodeid):
329
        """
330
        Get node using NodeId object or a string representing a NodeId
331
        """
332 1
        return Node(self.bclient, nodeid)
333
334 1
    def create_subscription(self, period, handler):
335
        """
336
        Create a subscription.
337
        returns a Subscription object which allow
338
        to subscribe to events or data on server
339
        handler argument is a class with data_change and/or event methods.
340
        These methods will be called when notfication from server are received.
341
        See example-client.py.
342
        Do not do expensive/slow or network operation from these methods 
343
        since they are called directly from receiving thread. This is a design choice,
344
        start another thread if you need to do such a thing.
345
        """
346 1
        params = ua.CreateSubscriptionParameters()
347 1
        params.RequestedPublishingInterval = period
348 1
        params.RequestedLifetimeCount = 3000
349 1
        params.RequestedMaxKeepAliveCount = 10000
350 1
        params.MaxNotificationsPerPublish = 10000
351 1
        params.PublishingEnabled = True
352 1
        params.Priority = 0
353 1
        return Subscription(self.bclient, params, handler)
354
355 1
    def get_namespace_array(self):
356 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
357 1
        return ns_node.get_value()
358
359 1
    def get_namespace_index(self, uri):
360 1
        uries = self.get_namespace_array()
361
        return uries.index(uri)
362