Completed
Pull Request — master (#70)
by Olivier
03:03
created

opcua.Client.get_server_node()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1
Metric Value
dl 0
loc 2
ccs 2
cts 2
cp 1
rs 10
cc 1
crap 1
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
from opcua import uacrypto
14
15
16 1
class KeepAlive(Thread):
17
18
    """
19
    Used by Client to keep session opened.
20
    OPCUA defines timeout both for sessions and secure channel
21
    """
22
23 1
    def __init__(self, client, timeout):
24 1
        Thread.__init__(self)
25 1
        self.logger = logging.getLogger(__name__)
26 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
27
            timeout = 360000
28 1
        self.timeout = timeout
29 1
        self.client = client
30 1
        self._dostop = False
31 1
        self._cond = Condition()
32
33 1
    def run(self):
34 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
35 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
36 1
        while not self._dostop:
37 1
            with self._cond:
38 1
                self._cond.wait(self.timeout / 1000)
39 1
            if self._dostop:
40 1
                break
41
            self.logger.debug("renewing channel")
42
            self.client.open_secure_channel(renew=True)
43
            val = server_state.get_value()
44
            self.logger.debug("server state is: %s ", val)
45 1
        self.logger.debug("keepalive thread has stopped")
46
47 1
    def stop(self):
48 1
        self.logger.debug("stoping keepalive thread")
49 1
        with self._cond:
50 1
            self._cond.notify_all()
51 1
        self._dostop = True
52
53
54 1
class Client(object):
55
56
    """
57
    High level client to connect to an OPC-UA server.
58
    This class makes it easy to connect and browse address space.
59
    It attemps to expose as much functionality as possible
60
    but if you want to do to special things you will probably need
61
    to work with the BinaryClient object, available as self.bclient
62
    which offers a raw OPC-UA interface.
63
    """
64
65 1
    def __init__(self, url, timeout=1):
66
        """
67
        used url argument to connect to server.
68
        if you are unsure of url, write at least hostname and port
69
        and call get_endpoints
70
        timeout is the timeout to get an answer for requests to server
71
        public member of this call are available to be set by API users
72
73
        """
74 1
        self.logger = logging.getLogger(__name__)
75 1
        self.server_url = urlparse(url)
76 1
        self.name = "Pure Python Client"
77 1
        self.description = self.name
78 1
        self.application_uri = "urn:freeopcua:client"
79 1
        self.product_uri = "urn:freeopcua.github.no:client"
80 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
81 1
        self.security_mode = ua.MessageSecurityMode.None_
82 1
        self.secure_channel_id = None
83 1
        self.default_timeout = 3600000
84 1
        self.secure_channel_timeout = self.default_timeout
85 1
        self.session_timeout = self.default_timeout
86 1
        self._policy_ids = []
87 1
        self.server_certificate = ""
88 1
        self.client_certificate = ""
89 1
        self.private_key = ""
90 1
        self.bclient = BinaryClient(timeout)
91 1
        self._nonce = None
92 1
        self._session_counter = 1
93 1
        self.keepalive = None
94
95 1
    def load_client_certificate(self, path):
96
        """
97
        load our certificate from file, either pem or der
98
        """
99
        _, ext = os.path.splitext(path)
100
        with open(path, "br") as f:
101
            self.client_certificate = f.read()
102
        if ext == ".pem":
103
            self.client_certificate = uacrypto.dem_to_der(self.client_certificate)
104
105 1
    def load_private_key(self, path):
106
        with open(path, "br") as f:
107
            self.private_key = f.read()
108
109 1
    def connect_and_register_server(self):
110
        """
111
        Connect to discovery server, register my server, and disconnect
112
        """
113
        self.connect_socket()
114
        self.send_hello()
115
        self.open_secure_channel()
116
        self.register_server()
117
        self.close_secure_channel()
118
        self.disconnect_socket()
119
120 1
    def connect_and_get_server_endpoints(self):
121
        """
122
        Connect, ask server for endpoints, and disconnect
123
        """
124
        self.connect_socket()
125
        self.send_hello()
126
        self.open_secure_channel()
127
        endpoints = self.get_endpoints()
128
        self.close_secure_channel()
129
        self.disconnect_socket()
130
        return endpoints
131
132 1
    def connect_and_find_servers(self):
133
        """
134
        Connect, ask server for a list of known servers, and disconnect
135
        """
136
        self.connect_socket()
137
        self.send_hello()
138
        self.open_secure_channel()  # spec says it should not be necessary to open channel
139
        servers = self.find_servers()
140
        self.close_secure_channel()
141
        self.disconnect_socket()
142
        return servers
143
144 1
    def connect_and_find_servers_on_network(self):
145
        """
146
        Connect, ask server for a list of known servers on network, and disconnect
147
        """
148
        self.connect_socket()
149
        self.send_hello()
150
        self.open_secure_channel()
151
        servers = self.find_servers_on_network()
152
        self.close_secure_channel()
153
        self.disconnect_socket()
154
        return servers
155
156 1
    def connect(self):
157
        """
158
        High level method
159
        Connect, create and activate session
160
        """
161 1
        self.connect_socket()
162 1
        self.send_hello()
163 1
        self.open_secure_channel()
164 1
        self.create_session()
165 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
166
167 1
    def disconnect(self):
168
        """
169
        High level method
170
        Close session, secure channel and socket
171
        """
172 1
        self.close_session()
173 1
        self.close_secure_channel()
174 1
        self.disconnect_socket()
175
176 1
    def connect_socket(self):
177
        """
178
        connect to socket defined in url
179
        """
180 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
181
182 1
    def disconnect_socket(self):
183 1
        self.bclient.disconnect_socket()
184
185 1
    def send_hello(self):
186
        """
187
        Send OPC-UA hello to server
188
        """
189 1
        ack = self.bclient.send_hello(self.server_url.geturl())
190
        # FIXME check ack
191
192 1
    def open_secure_channel(self, renew=False):
193
        """
194
        Open secure channel, if renew is True, renew channel
195
        """
196 1
        params = ua.OpenSecureChannelParameters()
197 1
        params.ClientProtocolVersion = 0
198 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
199 1
        if renew:
200
            params.RequestType = ua.SecurityTokenRequestType.Renew
201 1
        params.SecurityMode = self.security_mode
202 1
        params.RequestedLifetime = self.secure_channel_timeout
203 1
        params.ClientNonce = '\x00'
204 1
        result = self.bclient.open_secure_channel(params)
205 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
206
207 1
    def close_secure_channel(self):
208 1
        return self.bclient.close_secure_channel()
209
210 1
    def get_endpoints(self):
211 1
        params = ua.GetEndpointsParameters()
212 1
        params.EndpointUrl = self.server_url.geturl()
213 1
        return self.bclient.get_endpoints(params)
214
215 1
    def register_server(self, server, discovery_configuration=None):
216
        """
217
        register a server to discovery server
218
        if discovery_configuration is provided, the newer register_server2 service call is used
219
        """
220 1
        serv = ua.RegisteredServer()
221 1
        serv.ServerUri = server.application_uri
222 1
        serv.ProductUri = server.product_uri
223 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
224 1
        serv.ServerType = server.application_type
225 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
226 1
        serv.IsOnline = True
227 1
        if discovery_configuration:
228
            params = ua.registerServer2Parameters()
229
            params.Server = serv
230
            params.DiscoveryConfiguration
231
            return self.bclient.register_server2(params)
232
        else:
233 1
            return self.bclient.register_server(serv)
234
235 1
    def find_servers(self, uris=[]):
236
        """
237
        send a FindServer request to the server. The answer should be a list of
238
        servers the server knows about
239
        A list of uris can be provided, only server having matching uris will be returned
240
        """
241 1
        params = ua.FindServersParameters()
242 1
        params.EndpointUrl = self.server_url.geturl()
243 1
        params.ServerUris = uris 
244 1
        return self.bclient.find_servers(params)
245
246 1
    def find_servers_on_network(self):
247
        params = ua.FindServersOnNetworkParameters()
248
        return self.bclient.find_servers_on_network(params)
249
250 1
    def create_session(self):
251 1
        desc = ua.ApplicationDescription()
252 1
        desc.ApplicationUri = self.application_uri
253 1
        desc.ProductUri = self.product_uri
254 1
        desc.ApplicationName = ua.LocalizedText(self.name)
255 1
        desc.ApplicationType = ua.ApplicationType.Client
256
257 1
        params = ua.CreateSessionParameters()
258 1
        params.ClientNonce = utils.create_nonce()
259 1
        params.ClientCertificate = b''
260 1
        params.ClientDescription = desc
261 1
        params.EndpointUrl = self.server_url.geturl()
262 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
263 1
        params.RequestedSessionTimeout = 3600000
264 1
        params.MaxResponseMessageSize = 0  # means no max size
265 1
        params.ClientCertificate = self.client_certificate
266 1
        response = self.bclient.create_session(params)
267 1
        self.server_certificate = response.ServerCertificate
268 1
        for ep in response.ServerEndpoints:
269 1
            if urlparse(ep.EndpointUrl).scheme == self.server_url.scheme and ep.SecurityMode == self.security_mode:
270
                # remember PolicyId's: we will use them in activate_session()
271 1
                self._policy_ids = ep.UserIdentityTokens
272 1
        self.session_timeout = response.RevisedSessionTimeout
273 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
274 1
        self.keepalive.start()
275 1
        return response
276
277 1
    def server_policy_id(self, token_type, default):
278
        """
279
        Find PolicyId of server's UserTokenPolicy by token_type.
280
        Return default if there's no matching UserTokenPolicy.
281
        """
282 1
        for policy in self._policy_ids:
283 1
            if policy.TokenType == token_type:
284 1
                return policy.PolicyId
285 1
        return default
286
287 1
    def activate_session(self, username=None, password=None, certificate=None):
288
        """
289
        Activate session using either username and password or private_key
290
        """
291 1
        params = ua.ActivateSessionParameters()
292 1
        params.LocaleIds.append("en")
293 1
        if not username and not certificate:
294 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
295 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
296 1
        elif certificate:
297
            params.UserIdentityToken = ua.X509IdentityToken()
298
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
299
            params.UserIdentityToken.CertificateData = certificate
300
            sig = uacrypto.sign_sha1(self.private_key, certificate)
301
            params.UserTokenSignature = ua.SignatureData()
302
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
303
            params.UserTokenSignature.Signature = sig
304
        else:
305 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
306 1
            params.UserIdentityToken.UserName = username 
307 1
            if self.server_url.password:
308
                pubkey = uacrypto.pubkey_from_dercert(self.server_certificate)
309
                data = uacrypto.encrypt_rsa_oaep(pubkey, bytes(password, "utf8"))
310
                params.UserIdentityToken.Password = data
311 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
312 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
313 1
        return self.bclient.activate_session(params)
314
315 1
    def close_session(self):
316
        """
317
        Close session
318
        """
319 1
        if self.keepalive:
320 1
            self.keepalive.stop()
321 1
        return self.bclient.close_session(True)
322
323 1
    def get_root_node(self):
324 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
325
326 1
    def get_objects_node(self):
327 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
328
329 1
    def get_server_node(self):
330 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
331
332 1
    def get_node(self, nodeid):
333
        """
334
        Get node using NodeId object or a string representing a NodeId
335
        """
336 1
        return Node(self.bclient, nodeid)
337
338 1
    def create_subscription(self, period, handler):
339
        """
340
        Create a subscription.
341
        returns a Subscription object which allow
342
        to subscribe to events or data on server
343
        handler argument is a class with data_change and/or event methods.
344
        These methods will be called when notfication from server are received.
345
        See example-client.py.
346
        Do not do expensive/slow or network operation from these methods 
347
        since they are called directly from receiving thread. This is a design choice,
348
        start another thread if you need to do such a thing.
349
        """
350 1
        params = ua.CreateSubscriptionParameters()
351 1
        params.RequestedPublishingInterval = period
352 1
        params.RequestedLifetimeCount = 3000
353 1
        params.RequestedMaxKeepAliveCount = 10000
354 1
        params.MaxNotificationsPerPublish = 4294967295
355 1
        params.PublishingEnabled = True
356 1
        params.Priority = 0
357 1
        return Subscription(self.bclient, params, handler)
358
359 1
    def get_namespace_array(self):
360 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
361 1
        return ns_node.get_value()
362
363 1
    def get_namespace_index(self, uri):
364 1
        uries = self.get_namespace_array()
365
        return uries.index(uri)
366