Completed
Push — dev ( a35960...17cd27 )
by Olivier
02:25
created

opcua.Client.connect()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4286
cc 1
crap 1
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
from opcua import uacrypto
14
15
16 1
class KeepAlive(Thread):
17
18
    """
19
    Used by Client to keep session opened.
20
    OPCUA defines timeout both for sessions and secure channel
21
    """
22
23 1
    def __init__(self, client, timeout):
24 1
        Thread.__init__(self)
25 1
        self.logger = logging.getLogger(__name__)
26 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
27
            timeout = 360000
28 1
        self.timeout = timeout
29 1
        self.client = client
30 1
        self._dostop = False
31 1
        self._cond = Condition()
32
33 1
    def run(self):
34 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
35 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
36 1
        while not self._dostop:
37 1
            with self._cond:
38 1
                self._cond.wait(self.timeout / 1000)
39 1
            if self._dostop:
40 1
                break
41
            self.logger.debug("renewing channel")
42
            self.client.open_secure_channel(renew=True)
43
            val = server_state.get_value()
44
            self.logger.debug("server state is: %s ", val)
45 1
        self.logger.debug("keepalive thread has stopped")
46
47 1
    def stop(self):
48 1
        self.logger.debug("stoping keepalive thread")
49 1
        with self._cond:
50 1
            self._cond.notify_all()
51 1
        self._dostop = True
52
53
54 1
class Client(object):
55
56
    """
57
    High level client to connect to an OPC-UA server.
58
    This class makes it easy to connect and browse address space.
59
    It attemps to expose as much functionality as possible
60
    but if you want to do to special things you will probably need
61
    to work with the BinaryClient object, available as self.bclient
62
    which offers a raw OPC-UA interface.
63
    """
64
65 1
    def __init__(self, url, timeout=1):
66
        """
67
        used url argument to connect to server.
68
        if you are unsure of url, write at least hostname and port
69
        and call get_endpoints
70
        timeout is the timeout to get an answer for requests to server
71
        public member of this call are available to be set by API users
72
73
        """
74 1
        self.logger = logging.getLogger(__name__)
75 1
        self.server_url = urlparse(url)
76 1
        self.name = "Pure Python Client"
77 1
        self.description = self.name
78 1
        self.application_uri = "urn:freeopcua:client"
79 1
        self.product_uri = "urn:freeopcua.github.no:client"
80 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
81 1
        self.security_mode = ua.MessageSecurityMode.None_
82 1
        self.secure_channel_id = None
83 1
        self.default_timeout = 3600000
84 1
        self.secure_channel_timeout = self.default_timeout
85 1
        self.session_timeout = self.default_timeout
86 1
        self._policy_ids = []
87 1
        self.server_certificate = ""
88 1
        self.client_certificate = ""
89 1
        self.private_key = ""
90 1
        self.bclient = BinaryClient(timeout)
91 1
        self._nonce = None
92 1
        self._session_counter = 1
93 1
        self.keepalive = None
94
95 1
    def load_client_certificate(self, path):
96
        """
97
        load our certificate from file, either pem or der
98
        """
99
        _, ext = os.path.splitext(path)
100
        with open(path, "br") as f:
101
            self.client_certificate = f.read()
102
        if ext == ".pem":
103
            self.client_certificate = uacrypto.dem_to_der(self.client_certificate)
104
105 1
    def load_private_key(self, path):
106
        with open(path, "br") as f:
107
            self.private_key = f.read()
108
109 1
    def get_server_endpoints(self):
110
        """
111
        Connect, ask server for endpoints, and disconnect
112
        """
113
        self.connect_socket()
114
        self.send_hello()
115
        self.open_secure_channel()
116
        endpoints = self.get_endpoints()
117
        self.close_secure_channel()
118
        self.disconnect_socket()
119
        return endpoints
120
121 1
    def find_all_servers(self):
122
        """
123
        Connect, ask server for a list of known servers, and disconnect
124
        """
125
        self.connect_socket()
126
        self.send_hello()
127
        self.open_secure_channel()
128
        servers = self.find_servers()
129
        self.close_secure_channel()
130
        self.disconnect_socket()
131
        return servers
132
133 1
    def find_all_servers_on_network(self):
134
        """
135
        Connect, ask server for a list of known servers on network, and disconnect
136
        """
137
        self.connect_socket()
138
        self.send_hello()
139
        self.open_secure_channel()
140
        servers = self.find_servers_on_network()
141
        self.close_secure_channel()
142
        self.disconnect_socket()
143
        return servers
144
145
146
147 1
    def connect(self):
148
        """
149
        High level method
150
        Connect, create and activate session
151
        """
152 1
        self.connect_socket()
153 1
        self.send_hello()
154 1
        self.open_secure_channel()
155 1
        self.create_session()
156 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
157
158 1
    def disconnect(self):
159
        """
160
        High level method
161
        Close session, secure channel and socket
162
        """
163 1
        self.close_session()
164 1
        self.close_secure_channel()
165 1
        self.disconnect_socket()
166
167 1
    def connect_socket(self):
168
        """
169
        connect to socket defined in url
170
        """
171 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
172
173 1
    def disconnect_socket(self):
174 1
        self.bclient.disconnect_socket()
175
176 1
    def send_hello(self):
177
        """
178
        Send OPC-UA hello to server
179
        """
180 1
        ack = self.bclient.send_hello(self.server_url.geturl())
181
        # FIXME check ack
182
183 1
    def open_secure_channel(self, renew=False):
184
        """
185
        Open secure channel, if renew is True, renew channel
186
        """
187 1
        params = ua.OpenSecureChannelParameters()
188 1
        params.ClientProtocolVersion = 0
189 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
190 1
        if renew:
191
            params.RequestType = ua.SecurityTokenRequestType.Renew
192 1
        params.SecurityMode = self.security_mode
193 1
        params.RequestedLifetime = self.secure_channel_timeout
194 1
        params.ClientNonce = '\x00'
195 1
        result = self.bclient.open_secure_channel(params)
196 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
197
198 1
    def close_secure_channel(self):
199 1
        return self.bclient.close_secure_channel()
200
201 1
    def get_endpoints(self):
202 1
        params = ua.GetEndpointsParameters()
203 1
        params.EndpointUrl = self.server_url.geturl()
204 1
        return self.bclient.get_endpoints(params)
205
206 1
    def find_servers(self):
207
        params = ua.FindServersParameters()
208
        return self.bclient.find_servers(params)
209
210 1
    def find_servers_on_network(self):
211
        params = ua.FindServersOnNetworkParameters()
212
        return self.bclient.find_servers_on_network(params)
213
214 1
    def create_session(self):
215 1
        desc = ua.ApplicationDescription()
216 1
        desc.ApplicationUri = self.application_uri
217 1
        desc.ProductUri = self.product_uri
218 1
        desc.ApplicationName = ua.LocalizedText(self.name)
219 1
        desc.ApplicationType = ua.ApplicationType.Client
220
221 1
        params = ua.CreateSessionParameters()
222 1
        params.ClientNonce = utils.create_nonce()
223 1
        params.ClientCertificate = b''
224 1
        params.ClientDescription = desc
225 1
        params.EndpointUrl = self.server_url.geturl()
226 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
227 1
        params.RequestedSessionTimeout = 3600000
228 1
        params.MaxResponseMessageSize = 0  # means no max size
229 1
        params.ClientCertificate = self.client_certificate
230 1
        response = self.bclient.create_session(params)
231 1
        self.server_certificate = response.ServerCertificate
232 1
        for ep in response.ServerEndpoints:
233 1
            if ep.SecurityMode == self.security_mode:
234
                # remember PolicyId's: we will use them in activate_session()
235 1
                self._policy_ids = ep.UserIdentityTokens
236 1
        self.session_timeout = response.RevisedSessionTimeout
237 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
238 1
        self.keepalive.start()
239 1
        return response
240
241 1
    def activate_session(self, username=None, password=None, certificate=None):
242
        """
243
        Activate session using either username and password or private_key
244
        """
245 1
        params = ua.ActivateSessionParameters()
246 1
        params.LocaleIds.append("en")
247 1
        if not username and not certificate:
248 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
249 1
            params.UserIdentityToken.PolicyId = b"anonymous" 
250 1
        elif certificate:
251
            params.UserIdentityToken = ua.X509IdentityToken()
252
            params.UserIdentityToken.PolicyId = b"certificate_basic256"  
253
            params.UserIdentityToken.CertificateData = certificate
254
            sig = uacrypto.sign_sha1(self.private_key, certificate)
255
            params.UserTokenSignature = ua.SignatureData()
256
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
257
            params.UserTokenSignature.Signature = sig
258
        else:
259 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
260 1
            params.UserIdentityToken.UserName = username 
261 1
            if self.server_url.password:
262
                pubkey = uacrypto.pubkey_from_dercert(self.server_certificate)
263
                data = uacrypto.encrypt_rsa_oaep(pubkey, bytes(password, "utf8"))
264
                params.UserIdentityToken.Password = data
265 1
            params.UserIdentityToken.PolicyId = b"username_basic256" 
266 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
267 1
        return self.bclient.activate_session(params)
268
269 1
    def close_session(self):
270
        """
271
        Close session
272
        """
273 1
        if self.keepalive:
274 1
            self.keepalive.stop()
275 1
        return self.bclient.close_session(True)
276
277 1
    def get_root_node(self):
278 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
279
280 1
    def get_objects_node(self):
281 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
282
283 1
    def get_server_node(self):
284 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
285
286 1
    def get_node(self, nodeid):
287
        """
288
        Get node using NodeId object or a string representing a NodeId
289
        """
290 1
        return Node(self.bclient, nodeid)
291
292 1
    def create_subscription(self, period, handler):
293
        """
294
        Create a subscription.
295
        returns a Subscription object which allow
296
        to subscribe to events or data on server
297
        handler argument is a class with data_change and/or event methods.
298
        These methods will be called when notfication from server are received.
299
        See example-client.py.
300
        Do not do expensive/slow or network operation from these methods 
301
        since they are called directly from receiving thread. This is a design choice,
302
        start another thread if you need to do such a thing.
303
        """
304 1
        params = ua.CreateSubscriptionParameters()
305 1
        params.RequestedPublishingInterval = period
306 1
        params.RequestedLifetimeCount = 3000
307 1
        params.RequestedMaxKeepAliveCount = 10000
308 1
        params.MaxNotificationsPerPublish = 4294967295
309 1
        params.PublishingEnabled = True
310 1
        params.Priority = 0
311 1
        return Subscription(self.bclient, params, handler)
312
313 1
    def get_namespace_array(self):
314 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
315 1
        return ns_node.get_value()
316
317 1
    def get_namespace_index(self, uri):
318 1
        uries = self.get_namespace_array()
319
        return uries.index(uri)
320