Completed
Push — dev ( d958a8...030627 )
by Olivier
05:35 queued 03:22
created

opcua.Client.load_client_certificate()   A

Complexity

Conditions 3

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 8.2077
Metric Value
dl 0
loc 9
ccs 1
cts 6
cp 0.1667
rs 9.6667
cc 3
crap 8.2077
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
from opcua import uacrypto
14
15
16 1
class KeepAlive(Thread):
17
18
    """
19
    Used by Client to keep session opened.
20
    OPCUA defines timeout both for sessions and secure channel
21
    """
22
23 1
    def __init__(self, client, timeout):
24 1
        Thread.__init__(self)
25 1
        self.logger = logging.getLogger(__name__)
26 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
27
            timeout = 360000
28 1
        self.timeout = timeout
29 1
        self.client = client
30 1
        self._dostop = False
31 1
        self._cond = Condition()
32
33 1
    def run(self):
34 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
35 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
36 1
        while not self._dostop:
37 1
            with self._cond:
38 1
                self._cond.wait(self.timeout / 1000)
39 1
            if self._dostop:
40 1
                break
41
            self.logger.debug("renewing channel")
42
            self.client.open_secure_channel(renew=True)
43
            val = server_state.get_value()
44
            self.logger.debug("server state is: %s ", val)
45 1
        self.logger.debug("keepalive thread has stopped")
46
47 1
    def stop(self):
48 1
        self.logger.debug("stoping keepalive thread")
49 1
        with self._cond:
50 1
            self._cond.notify_all()
51 1
        self._dostop = True
52
53
54 1
class Client(object):
55
56
    """
57
    High level client to connect to an OPC-UA server.
58
    This class makes it easy to connect and browse address space.
59
    It attemps to expose as much functionality as possible
60
    but if you want to do to special things you will probably need
61
    to work with the BinaryClient object, available as self.bclient
62
    which offers a raw OPC-UA interface.
63
    """
64
65 1
    def __init__(self, url, timeout=1):
66
        """
67
        used url argument to connect to server.
68
        if you are unsure of url, write at least hostname and port
69
        and call get_endpoints
70
        timeout is the timeout to get an answer for requests to server
71
        public member of this call are available to be set by API users
72
73
        """
74 1
        self.logger = logging.getLogger(__name__)
75 1
        self.server_url = urlparse(url)
76 1
        self.name = "Pure Python Client"
77 1
        self.description = self.name
78 1
        self.application_uri = "urn:freeopcua:client"
79 1
        self.product_uri = "urn:freeopcua.github.no:client"
80 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
81 1
        self.security_mode = ua.MessageSecurityMode.None_
82 1
        self.secure_channel_id = None
83 1
        self.default_timeout = 3600000
84 1
        self.secure_channel_timeout = self.default_timeout
85 1
        self.session_timeout = self.default_timeout
86 1
        self._policy_ids = []
87 1
        self.server_certificate = ""
88 1
        self.client_certificate = ""
89 1
        self.private_key = ""
90 1
        self.bclient = BinaryClient(timeout)
91 1
        self._nonce = None
92 1
        self._session_counter = 1
93 1
        self.keepalive = None
94
95 1
    def load_client_certificate(self, path):
96
        """
97
        load our certificate from file, either pem or der
98
        """
99
        _, ext = os.path.splitext(path)
100
        with open(path, "br") as f:
101
            self.client_certificate = f.read()
102
        if ext == ".pem":
103
            self.client_certificate = uacrypto.dem_to_der(self.client_certificate)
104
105 1
    def load_private_key(self, path):
106
        with open(path, "br") as f:
107
            self.private_key = f.read()
108
109 1
    def connect_and_register_server(self):
110
        """
111
        Connect to discovery server, register my server, and disconnect
112
        """
113
        self.connect_socket()
114
        self.send_hello()
115
        self.open_secure_channel()
116
        self.register_server()
117
        self.close_secure_channel()
118
        self.disconnect_socket()
119
120
    def connect_and_get_server_endpoints(self):
121 1
        """
122
        Connect, ask server for endpoints, and disconnect
123
        """
124
        self.connect_socket()
125
        self.send_hello()
126
        self.open_secure_channel()
127
        endpoints = self.get_endpoints()
128
        self.close_secure_channel()
129
        self.disconnect_socket()
130
        return endpoints
131
132
    def connect_and_find_servers(self):
133 1
        """
134
        Connect, ask server for a list of known servers, and disconnect
135
        """
136
        self.connect_socket()
137
        self.send_hello()
138
        self.open_secure_channel()  # spec says it should not be necessary to open channel
139
        servers = self.find_servers()
140
        self.close_secure_channel()
141
        self.disconnect_socket()
142
        return servers
143
144
    def connect_and_find_servers_on_network(self):
145
        """
146
        Connect, ask server for a list of known servers on network, and disconnect
147 1
        """
148
        self.connect_socket()
149
        self.send_hello()
150
        self.open_secure_channel()
151
        servers = self.find_servers_on_network()
152 1
        self.close_secure_channel()
153 1
        self.disconnect_socket()
154 1
        return servers
155 1
156 1
    def connect(self):
157
        """
158 1
        High level method
159
        Connect, create and activate session
160
        """
161
        self.connect_socket()
162
        self.send_hello()
163 1
        self.open_secure_channel()
164 1
        self.create_session()
165 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
166
167 1
    def disconnect(self):
168
        """
169
        High level method
170
        Close session, secure channel and socket
171 1
        """
172
        self.close_session()
173 1
        self.close_secure_channel()
174 1
        self.disconnect_socket()
175
176 1
    def connect_socket(self):
177
        """
178
        connect to socket defined in url
179
        """
180 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
181
182
    def disconnect_socket(self):
183 1
        self.bclient.disconnect_socket()
184
185
    def send_hello(self):
186
        """
187 1
        Send OPC-UA hello to server
188 1
        """
189 1
        ack = self.bclient.send_hello(self.server_url.geturl())
190 1
        # FIXME check ack
191
192 1
    def open_secure_channel(self, renew=False):
193 1
        """
194 1
        Open secure channel, if renew is True, renew channel
195 1
        """
196 1
        params = ua.OpenSecureChannelParameters()
197
        params.ClientProtocolVersion = 0
198 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
199 1
        if renew:
200
            params.RequestType = ua.SecurityTokenRequestType.Renew
201 1
        params.SecurityMode = self.security_mode
202 1
        params.RequestedLifetime = self.secure_channel_timeout
203 1
        params.ClientNonce = '\x00'
204 1
        result = self.bclient.open_secure_channel(params)
205
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
206 1
207
    def close_secure_channel(self):
208
        return self.bclient.close_secure_channel()
209
210 1
    def get_endpoints(self):
211
        params = ua.GetEndpointsParameters()
212
        params.EndpointUrl = self.server_url.geturl()
213
        return self.bclient.get_endpoints(params)
214 1
215 1
    def register_server(self, server_uri, product_uri, endpoint_url):
216 1
        """
217 1
        register a server to discovery server
218 1
        """
219 1
        serv = ua.RegisteredServer()
220
        serv.ServerUri = server_uri
221 1
        serv.ProductUri = product_uri
222 1
        serv.DiscoveryUrls = [endpoint_url]
223 1
        serv.ServerType = ua.ApplicationType.ClientAndServer
224 1
        serv.IsOnline = True
225 1
        self.bclient.register_server(serv)
226 1
227 1
    def register_server2(self, server_uri, product_uri, endpoint_url):
228 1
        """
229 1
        register a server to discovery server, using the newer RegisterServer2 service
230 1
        This is mostly sample code as no known server implement this....
231 1
        """
232 1
        serv = ua.RegisteredServer()
233 1
        serv.ServerUri = server_uri
234
        serv.ProductUri = product_uri
235 1
        serv.DiscoveryUrls = [endpoint_url]
236 1
        serv.ServerType = ua.ApplicationType.ClientAndServer
237 1
        serv.IsOnline = True
238 1
239 1
        params = ua.registerServer2Parameters()
240
        params.Server = serv
241 1
        params.DiscoveryConfiguration
242
        return self.bclient.register_server2(params)
243
244
    def find_servers(self, uris=[]):
245
        """
246 1
        send a FindServer request to the server. The answer should be a list of
247 1
        servers the server knows about
248 1
        A list of uris can be provided, only server having matching uris will be returned
249 1
        """
250
        params = ua.FindServersParameters()
251 1
        params.EndpointUrl = self.server_url.geturl()
252
        params.ServerUris = uris 
253
        return self.bclient.find_servers(params)
254
255 1
    def find_servers_on_network(self):
256 1
        params = ua.FindServersOnNetworkParameters()
257 1
        return self.bclient.find_servers_on_network(params)
258 1
259 1
    def create_session(self):
260 1
        desc = ua.ApplicationDescription()
261
        desc.ApplicationUri = self.application_uri
262
        desc.ProductUri = self.product_uri
263
        desc.ApplicationName = ua.LocalizedText(self.name)
264
        desc.ApplicationType = ua.ApplicationType.Client
265
266
        params = ua.CreateSessionParameters()
267
        params.ClientNonce = utils.create_nonce()
268
        params.ClientCertificate = b''
269 1
        params.ClientDescription = desc
270 1
        params.EndpointUrl = self.server_url.geturl()
271 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
272
        params.RequestedSessionTimeout = 3600000
273
        params.MaxResponseMessageSize = 0  # means no max size
274
        params.ClientCertificate = self.client_certificate
275 1
        response = self.bclient.create_session(params)
276 1
        self.server_certificate = response.ServerCertificate
277 1
        for ep in response.ServerEndpoints:
278
            if urlparse(ep.EndpointUrl).scheme == self.server_url.scheme and ep.SecurityMode == self.security_mode:
279 1
                # remember PolicyId's: we will use them in activate_session()
280
                self._policy_ids = ep.UserIdentityTokens
281
        self.session_timeout = response.RevisedSessionTimeout
282
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
283 1
        self.keepalive.start()
284 1
        return response
285 1
286
    def server_policy_id(self, token_type, default):
287 1
        """
288 1
        Find PolicyId of server's UserTokenPolicy by token_type.
289
        Return default if there's no matching UserTokenPolicy.
290 1
        """
291 1
        for policy in self._policy_ids:
292
            if policy.TokenType == token_type:
293 1
                return policy.PolicyId
294 1
        return default
295
296 1
    def activate_session(self, username=None, password=None, certificate=None):
297
        """
298
        Activate session using either username and password or private_key
299
        """
300 1
        params = ua.ActivateSessionParameters()
301
        params.LocaleIds.append("en")
302 1
        if not username and not certificate:
303
            params.UserIdentityToken = ua.AnonymousIdentityToken()
304
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
305
        elif certificate:
306
            params.UserIdentityToken = ua.X509IdentityToken()
307
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
308
            params.UserIdentityToken.CertificateData = certificate
309
            sig = uacrypto.sign_sha1(self.private_key, certificate)
310
            params.UserTokenSignature = ua.SignatureData()
311
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
312
            params.UserTokenSignature.Signature = sig
313
        else:
314 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
315 1
            params.UserIdentityToken.UserName = username 
316 1
            if self.server_url.password:
317 1
                pubkey = uacrypto.pubkey_from_dercert(self.server_certificate)
318 1
                data = uacrypto.encrypt_rsa_oaep(pubkey, bytes(password, "utf8"))
319 1
                params.UserIdentityToken.Password = data
320 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
321 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
322
        return self.bclient.activate_session(params)
323 1
324 1
    def close_session(self):
325 1
        """
326
        Close session
327 1
        """
328 1
        if self.keepalive:
329 1
            self.keepalive.stop()
330
        return self.bclient.close_session(True)
331
332
    def get_root_node(self):
333
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
334
335
    def get_objects_node(self):
336
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
337
338
    def get_server_node(self):
339
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
340
341
    def get_node(self, nodeid):
342
        """
343
        Get node using NodeId object or a string representing a NodeId
344
        """
345
        return Node(self.bclient, nodeid)
346
347
    def create_subscription(self, period, handler):
348
        """
349
        Create a subscription.
350
        returns a Subscription object which allow
351
        to subscribe to events or data on server
352
        handler argument is a class with data_change and/or event methods.
353
        These methods will be called when notfication from server are received.
354
        See example-client.py.
355
        Do not do expensive/slow or network operation from these methods 
356
        since they are called directly from receiving thread. This is a design choice,
357
        start another thread if you need to do such a thing.
358
        """
359
        params = ua.CreateSubscriptionParameters()
360
        params.RequestedPublishingInterval = period
361
        params.RequestedLifetimeCount = 3000
362
        params.RequestedMaxKeepAliveCount = 10000
363
        params.MaxNotificationsPerPublish = 4294967295
364
        params.PublishingEnabled = True
365
        params.Priority = 0
366
        return Subscription(self.bclient, params, handler)
367
368
    def get_namespace_array(self):
369
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
370
        return ns_node.get_value()
371
372
    def get_namespace_index(self, uri):
373
        uries = self.get_namespace_array()
374
        return uries.index(uri)
375