Completed
Push — dev ( 3b343f...d958a8 )
by Olivier
04:35 queued 02:25
created

opcua.Client.get_endpoints()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1
Metric Value
dl 0
loc 4
ccs 4
cts 4
cp 1
rs 10
cc 1
crap 1
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
from opcua import uacrypto
14
15
16 1
class KeepAlive(Thread):
17
18
    """
19
    Used by Client to keep session opened.
20
    OPCUA defines timeout both for sessions and secure channel
21
    """
22
23 1
    def __init__(self, client, timeout):
24 1
        Thread.__init__(self)
25 1
        self.logger = logging.getLogger(__name__)
26 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
27
            timeout = 360000
28 1
        self.timeout = timeout
29 1
        self.client = client
30 1
        self._dostop = False
31 1
        self._cond = Condition()
32
33 1
    def run(self):
34 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
35 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
36 1
        while not self._dostop:
37 1
            with self._cond:
38 1
                self._cond.wait(self.timeout / 1000)
39 1
            if self._dostop:
40 1
                break
41
            self.logger.debug("renewing channel")
42
            self.client.open_secure_channel(renew=True)
43
            val = server_state.get_value()
44
            self.logger.debug("server state is: %s ", val)
45 1
        self.logger.debug("keepalive thread has stopped")
46
47 1
    def stop(self):
48 1
        self.logger.debug("stoping keepalive thread")
49 1
        with self._cond:
50 1
            self._cond.notify_all()
51 1
        self._dostop = True
52
53
54 1
class Client(object):
55
56
    """
57
    High level client to connect to an OPC-UA server.
58
    This class makes it easy to connect and browse address space.
59
    It attemps to expose as much functionality as possible
60
    but if you want to do to special things you will probably need
61
    to work with the BinaryClient object, available as self.bclient
62
    which offers a raw OPC-UA interface.
63
    """
64
65 1
    def __init__(self, url, timeout=1):
66
        """
67
        used url argument to connect to server.
68
        if you are unsure of url, write at least hostname and port
69
        and call get_endpoints
70
        timeout is the timeout to get an answer for requests to server
71
        public member of this call are available to be set by API users
72
73
        """
74 1
        self.logger = logging.getLogger(__name__)
75 1
        self.server_url = urlparse(url)
76 1
        self.name = "Pure Python Client"
77 1
        self.description = self.name
78 1
        self.application_uri = "urn:freeopcua:client"
79 1
        self.product_uri = "urn:freeopcua.github.no:client"
80 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
81 1
        self.security_mode = ua.MessageSecurityMode.None_
82 1
        self.secure_channel_id = None
83 1
        self.default_timeout = 3600000
84 1
        self.secure_channel_timeout = self.default_timeout
85 1
        self.session_timeout = self.default_timeout
86 1
        self._policy_ids = []
87 1
        self.server_certificate = ""
88 1
        self.client_certificate = ""
89 1
        self.private_key = ""
90 1
        self.bclient = BinaryClient(timeout)
91 1
        self._nonce = None
92 1
        self._session_counter = 1
93 1
        self.keepalive = None
94
95 1
    def load_client_certificate(self, path):
96
        """
97
        load our certificate from file, either pem or der
98
        """
99
        _, ext = os.path.splitext(path)
100
        with open(path, "br") as f:
101
            self.client_certificate = f.read()
102
        if ext == ".pem":
103
            self.client_certificate = uacrypto.dem_to_der(self.client_certificate)
104
105 1
    def load_private_key(self, path):
106
        with open(path, "br") as f:
107
            self.private_key = f.read()
108
109 1
    def get_server_endpoints(self):
110
        """
111
        Connect, ask server for endpoints, and disconnect
112
        """
113
        self.connect_socket()
114
        self.send_hello()
115
        self.open_secure_channel()
116
        endpoints = self.get_endpoints()
117
        self.close_secure_channel()
118
        self.disconnect_socket()
119
        return endpoints
120
121 1
    def find_all_servers(self):
122
        """
123
        Connect, ask server for a list of known servers, and disconnect
124
        """
125
        self.connect_socket()
126
        self.send_hello()
127
        self.open_secure_channel()
128
        servers = self.find_servers()
129
        self.close_secure_channel()
130
        self.disconnect_socket()
131
        return servers
132
133 1
    def find_all_servers_on_network(self):
134
        """
135
        Connect, ask server for a list of known servers on network, and disconnect
136
        """
137
        self.connect_socket()
138
        self.send_hello()
139
        self.open_secure_channel()
140
        servers = self.find_servers_on_network()
141
        self.close_secure_channel()
142
        self.disconnect_socket()
143
        return servers
144
145
146
147 1
    def connect(self):
148
        """
149
        High level method
150
        Connect, create and activate session
151
        """
152 1
        self.connect_socket()
153 1
        self.send_hello()
154 1
        self.open_secure_channel()
155 1
        self.create_session()
156 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
157
158 1
    def disconnect(self):
159
        """
160
        High level method
161
        Close session, secure channel and socket
162
        """
163 1
        self.close_session()
164 1
        self.close_secure_channel()
165 1
        self.disconnect_socket()
166
167 1
    def connect_socket(self):
168
        """
169
        connect to socket defined in url
170
        """
171 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
172
173 1
    def disconnect_socket(self):
174 1
        self.bclient.disconnect_socket()
175
176 1
    def send_hello(self):
177
        """
178
        Send OPC-UA hello to server
179
        """
180 1
        ack = self.bclient.send_hello(self.server_url.geturl())
181
        # FIXME check ack
182
183 1
    def open_secure_channel(self, renew=False):
184
        """
185
        Open secure channel, if renew is True, renew channel
186
        """
187 1
        params = ua.OpenSecureChannelParameters()
188 1
        params.ClientProtocolVersion = 0
189 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
190 1
        if renew:
191
            params.RequestType = ua.SecurityTokenRequestType.Renew
192 1
        params.SecurityMode = self.security_mode
193 1
        params.RequestedLifetime = self.secure_channel_timeout
194 1
        params.ClientNonce = '\x00'
195 1
        result = self.bclient.open_secure_channel(params)
196 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
197
198 1
    def close_secure_channel(self):
199 1
        return self.bclient.close_secure_channel()
200
201 1
    def get_endpoints(self):
202 1
        params = ua.GetEndpointsParameters()
203 1
        params.EndpointUrl = self.server_url.geturl()
204 1
        return self.bclient.get_endpoints(params)
205
206 1
    def find_servers(self):
207
        params = ua.FindServersParameters()
208
        return self.bclient.find_servers(params)
209
210 1
    def find_servers_on_network(self):
211
        params = ua.FindServersOnNetworkParameters()
212
        return self.bclient.find_servers_on_network(params)
213
214 1
    def create_session(self):
215 1
        desc = ua.ApplicationDescription()
216 1
        desc.ApplicationUri = self.application_uri
217 1
        desc.ProductUri = self.product_uri
218 1
        desc.ApplicationName = ua.LocalizedText(self.name)
219 1
        desc.ApplicationType = ua.ApplicationType.Client
220
221 1
        params = ua.CreateSessionParameters()
222 1
        params.ClientNonce = utils.create_nonce()
223 1
        params.ClientCertificate = b''
224 1
        params.ClientDescription = desc
225 1
        params.EndpointUrl = self.server_url.geturl()
226 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
227 1
        params.RequestedSessionTimeout = 3600000
228 1
        params.MaxResponseMessageSize = 0  # means no max size
229 1
        params.ClientCertificate = self.client_certificate
230 1
        response = self.bclient.create_session(params)
231 1
        self.server_certificate = response.ServerCertificate
232 1
        for ep in response.ServerEndpoints:
233 1
            if urlparse(ep.EndpointUrl).scheme == self.server_url.scheme and ep.SecurityMode == self.security_mode:
234
                # remember PolicyId's: we will use them in activate_session()
235 1
                self._policy_ids = ep.UserIdentityTokens
236 1
        self.session_timeout = response.RevisedSessionTimeout
237 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
238 1
        self.keepalive.start()
239 1
        return response
240
241 1
    def server_policy_id(self, token_type, default):
242
        """
243
        Find PolicyId of server's UserTokenPolicy by token_type.
244
        Return default if there's no matching UserTokenPolicy.
245
        """
246 1
        for policy in self._policy_ids:
247 1
            if policy.TokenType == token_type:
248 1
                return policy.PolicyId
249 1
        return default
250
251 1
    def activate_session(self, username=None, password=None, certificate=None):
252
        """
253
        Activate session using either username and password or private_key
254
        """
255 1
        params = ua.ActivateSessionParameters()
256 1
        params.LocaleIds.append("en")
257 1
        if not username and not certificate:
258 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
259 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
260 1
        elif certificate:
261
            params.UserIdentityToken = ua.X509IdentityToken()
262
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
263
            params.UserIdentityToken.CertificateData = certificate
264
            sig = uacrypto.sign_sha1(self.private_key, certificate)
265
            params.UserTokenSignature = ua.SignatureData()
266
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
267
            params.UserTokenSignature.Signature = sig
268
        else:
269 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
270 1
            params.UserIdentityToken.UserName = username 
271 1
            if self.server_url.password:
272
                pubkey = uacrypto.pubkey_from_dercert(self.server_certificate)
273
                data = uacrypto.encrypt_rsa_oaep(pubkey, bytes(password, "utf8"))
274
                params.UserIdentityToken.Password = data
275 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
276 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
277 1
        return self.bclient.activate_session(params)
278
279 1
    def close_session(self):
280
        """
281
        Close session
282
        """
283 1
        if self.keepalive:
284 1
            self.keepalive.stop()
285 1
        return self.bclient.close_session(True)
286
287 1
    def get_root_node(self):
288 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
289
290 1
    def get_objects_node(self):
291 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
292
293 1
    def get_server_node(self):
294 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
295
296 1
    def get_node(self, nodeid):
297
        """
298
        Get node using NodeId object or a string representing a NodeId
299
        """
300 1
        return Node(self.bclient, nodeid)
301
302 1
    def create_subscription(self, period, handler):
303
        """
304
        Create a subscription.
305
        returns a Subscription object which allow
306
        to subscribe to events or data on server
307
        handler argument is a class with data_change and/or event methods.
308
        These methods will be called when notfication from server are received.
309
        See example-client.py.
310
        Do not do expensive/slow or network operation from these methods 
311
        since they are called directly from receiving thread. This is a design choice,
312
        start another thread if you need to do such a thing.
313
        """
314 1
        params = ua.CreateSubscriptionParameters()
315 1
        params.RequestedPublishingInterval = period
316 1
        params.RequestedLifetimeCount = 3000
317 1
        params.RequestedMaxKeepAliveCount = 10000
318 1
        params.MaxNotificationsPerPublish = 4294967295
319 1
        params.PublishingEnabled = True
320 1
        params.Priority = 0
321 1
        return Subscription(self.bclient, params, handler)
322
323 1
    def get_namespace_array(self):
324 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
325 1
        return ns_node.get_value()
326
327 1
    def get_namespace_index(self, uri):
328 1
        uries = self.get_namespace_array()
329
        return uries.index(uri)
330