Completed
Push — master ( a80a7b...be43b4 )
by Olivier
02:17
created

opcua.Client.create_subscription()   A

Complexity

Conditions 1

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1
Metric Value
dl 0
loc 20
ccs 9
cts 9
cp 1
rs 9.4286
cc 1
crap 1
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
use_crypto = True
14 1
try:
15 1
    from opcua import uacrypto
16
except:
17
    print("pycrypto is not installed, use of crypto disabled")
18
    use_crypto = False
19
20
21 1
class KeepAlive(Thread):
22
23
    """
24
    Used by Client to keep session opened.
25
    OPCUA defines timeout both for sessions and secure channel
26
    """
27
28 1
    def __init__(self, client, timeout):
29 1
        Thread.__init__(self)
30 1
        self.logger = logging.getLogger(__name__)
31 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
32
            timeout = 360000
33 1
        self.timeout = timeout
34 1
        self.client = client
35 1
        self._dostop = False
36 1
        self._cond = Condition()
37
38 1
    def run(self):
39 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
40 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
41 1
        while not self._dostop:
42 1
            with self._cond:
43 1
                self._cond.wait(self.timeout / 1000)
44 1
            if self._dostop:
45 1
                break
46
            self.logger.debug("renewing channel")
47
            self.client.open_secure_channel(renew=True)
48
            val = server_state.get_value()
49
            self.logger.debug("server state is: %s ", val)
50 1
        self.logger.debug("keepalive thread has stopped")
51
52 1
    def stop(self):
53 1
        self.logger.debug("stoping keepalive thread")
54 1
        with self._cond:
55 1
            self._cond.notify_all()
56 1
        self._dostop = True
57
58
59 1
class Client(object):
60
61
    """
62
    High level client to connect to an OPC-UA server.
63
    This class makes it easy to connect and browse address space.
64
    It attemps to expose as much functionality as possible
65
    but if you want to do to special things you will probably need
66
    to work with the BinaryClient object, available as self.bclient
67
    which offers a raw OPC-UA interface.
68
    """
69
70 1
    def __init__(self, url, timeout=1, security_policy=ua.SecurityPolicy()):
71
        """
72
        used url argument to connect to server.
73
        if you are unsure of url, write at least hostname and port
74
        and call get_endpoints
75
        timeout is the timeout to get an answer for requests to server
76
        public member of this call are available to be set by API users
77
78
        """
79 1
        self.logger = logging.getLogger(__name__)
80 1
        self.server_url = urlparse(url)
81 1
        self.name = "Pure Python Client"
82 1
        self.description = self.name
83 1
        self.application_uri = "urn:freeopcua:client"
84 1
        self.product_uri = "urn:freeopcua.github.no:client"
85 1
        self.security_policy = security_policy
86 1
        self.secure_channel_id = None
87 1
        self.default_timeout = 3600000
88 1
        self.secure_channel_timeout = self.default_timeout
89 1
        self.session_timeout = self.default_timeout
90 1
        self._policy_ids = []
91 1
        self.server_certificate = b""
92 1
        self.client_certificate = b""
93 1
        self.private_key = b""
94 1
        self.bclient = BinaryClient(timeout, security_policy=security_policy)
95 1
        self._session_counter = 1
96 1
        self.keepalive = None
97
98 1
    def load_client_certificate(self, path):
99
        """
100
        load our certificate from file, either pem or der
101
        """
102
        _, ext = os.path.splitext(path)
103
        with open(path, "br") as f:
104
            self.client_certificate = f.read()
105
        if ext == ".pem":
106
            self.client_certificate = uacrypto.dem_to_der(self.client_certificate)
107
108 1
    def load_private_key(self, path):
109
        with open(path, "br") as f:
110
            self.private_key = f.read()
111
112 1
    def connect_and_get_server_endpoints(self):
113
        """
114
        Connect, ask server for endpoints, and disconnect
115
        """
116
        self.connect_socket()
117
        self.send_hello()
118
        self.open_secure_channel()
119
        endpoints = self.get_endpoints()
120
        self.close_secure_channel()
121
        self.disconnect_socket()
122
        return endpoints
123
124 1
    def connect_and_find_servers(self):
125
        """
126
        Connect, ask server for a list of known servers, and disconnect
127
        """
128
        self.connect_socket()
129
        self.send_hello()
130
        self.open_secure_channel()  # spec says it should not be necessary to open channel
131
        servers = self.find_servers()
132
        self.close_secure_channel()
133
        self.disconnect_socket()
134
        return servers
135
136 1
    def connect_and_find_servers_on_network(self):
137
        """
138
        Connect, ask server for a list of known servers on network, and disconnect
139
        """
140
        self.connect_socket()
141
        self.send_hello()
142
        self.open_secure_channel()
143
        servers = self.find_servers_on_network()
144
        self.close_secure_channel()
145
        self.disconnect_socket()
146
        return servers
147
148 1
    def connect(self):
149
        """
150
        High level method
151
        Connect, create and activate session
152
        """
153 1
        self.connect_socket()
154 1
        self.send_hello()
155 1
        self.open_secure_channel()
156 1
        self.create_session()
157 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
158
159 1
    def disconnect(self):
160
        """
161
        High level method
162
        Close session, secure channel and socket
163
        """
164 1
        self.close_session()
165 1
        self.close_secure_channel()
166 1
        self.disconnect_socket()
167
168 1
    def connect_socket(self):
169
        """
170
        connect to socket defined in url
171
        """
172 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
173
174 1
    def disconnect_socket(self):
175 1
        self.bclient.disconnect_socket()
176
177 1
    def send_hello(self):
178
        """
179
        Send OPC-UA hello to server
180
        """
181 1
        ack = self.bclient.send_hello(self.server_url.geturl())
182
        # FIXME check ack
183
184 1
    def open_secure_channel(self, renew=False):
185
        """
186
        Open secure channel, if renew is True, renew channel
187
        """
188 1
        params = ua.OpenSecureChannelParameters()
189 1
        params.ClientProtocolVersion = 0
190 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
191 1
        if renew:
192
            params.RequestType = ua.SecurityTokenRequestType.Renew
193 1
        params.SecurityMode = self.security_policy.Mode
194 1
        params.RequestedLifetime = self.secure_channel_timeout
195 1
        nonce = utils.create_nonce(self.security_policy.symmetric_key_size)   # length should be equal to the length of key of symmetric encryption
196 1
        params.ClientNonce = nonce	# this nonce is used to create a symmetric key
197 1
        result = self.bclient.open_secure_channel(params)
198 1
        self.security_policy.make_symmetric_key(nonce, result.ServerNonce)
199 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
200
201 1
    def close_secure_channel(self):
202 1
        return self.bclient.close_secure_channel()
203
204 1
    def get_endpoints(self):
205 1
        params = ua.GetEndpointsParameters()
206 1
        params.EndpointUrl = self.server_url.geturl()
207 1
        return self.bclient.get_endpoints(params)
208
209 1
    def register_server(self, server, discovery_configuration=None):
210
        """
211
        register a server to discovery server
212
        if discovery_configuration is provided, the newer register_server2 service call is used
213
        """
214 1
        serv = ua.RegisteredServer()
215 1
        serv.ServerUri = server.application_uri
216 1
        serv.ProductUri = server.product_uri
217 1
        serv.DiscoveryUrls = [server.endpoint.geturl()]
218 1
        serv.ServerType = server.application_type
219 1
        serv.ServerNames = [ua.LocalizedText(server.name)]
220 1
        serv.IsOnline = True
221 1
        if discovery_configuration:
222
            params = ua.RegisterServer2Parameters()
223
            params.Server = serv
224
            params.DiscoveryConfiguration = discovery_configuration
225
            return self.bclient.register_server2(params)
226
        else:
227 1
            return self.bclient.register_server(serv)
228
229 1
    def find_servers(self, uris=None):
230
        """
231
        send a FindServer request to the server. The answer should be a list of
232
        servers the server knows about
233
        A list of uris can be provided, only server having matching uris will be returned
234
        """
235 1
        if uris is None:
236 1
            uris = []
237 1
        params = ua.FindServersParameters()
238 1
        params.EndpointUrl = self.server_url.geturl()
239 1
        params.ServerUris = uris 
240 1
        return self.bclient.find_servers(params)
241
242 1
    def find_servers_on_network(self):
243
        params = ua.FindServersOnNetworkParameters()
244
        return self.bclient.find_servers_on_network(params)
245
246 1
    def create_session(self):
247 1
        desc = ua.ApplicationDescription()
248 1
        desc.ApplicationUri = self.application_uri
249 1
        desc.ProductUri = self.product_uri
250 1
        desc.ApplicationName = ua.LocalizedText(self.name)
251 1
        desc.ApplicationType = ua.ApplicationType.Client
252
253 1
        params = ua.CreateSessionParameters()
254 1
        nonce = utils.create_nonce(32)	# at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
255 1
        params.ClientNonce = nonce
256 1
        params.ClientCertificate = self.security_policy.client_certificate
257 1
        params.ClientDescription = desc
258 1
        params.EndpointUrl = self.server_url.geturl()
259 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
260 1
        params.RequestedSessionTimeout = 3600000
261 1
        params.MaxResponseMessageSize = 0  # means no max size
262 1
        response = self.bclient.create_session(params)
263 1
        self.security_policy.asymmetric_cryptography.verify(self.security_policy.client_certificate + nonce, response.ServerSignature.Signature)
264 1
        self._server_nonce = response.ServerNonce
265 1
        self.server_certificate = response.ServerCertificate
266 1
        for ep in response.ServerEndpoints:
267 1
            if urlparse(ep.EndpointUrl).scheme == self.server_url.scheme and ep.SecurityMode == self.security_policy.Mode:
268
                # remember PolicyId's: we will use them in activate_session()
269 1
                self._policy_ids = ep.UserIdentityTokens
270 1
        self.session_timeout = response.RevisedSessionTimeout
271 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
272 1
        self.keepalive.start()
273 1
        return response
274
275 1
    def server_policy_id(self, token_type, default):
276
        """
277
        Find PolicyId of server's UserTokenPolicy by token_type.
278
        Return default if there's no matching UserTokenPolicy.
279
        """
280 1
        for policy in self._policy_ids:
281 1
            if policy.TokenType == token_type:
282 1
                return policy.PolicyId
283 1
        return default
284
285 1
    def activate_session(self, username=None, password=None, certificate=None):
286
        """
287
        Activate session using either username and password or private_key
288
        """
289 1
        params = ua.ActivateSessionParameters()
290 1
        challenge = self.security_policy.server_certificate + self._server_nonce
291 1
        params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
292 1
        params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
293 1
        params.LocaleIds.append("en")
294 1
        if not username and not certificate:
295 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
296 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Anonymous, b"anonymous")
297 1
        elif certificate:
298
            params.UserIdentityToken = ua.X509IdentityToken()
299
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, b"certificate_basic256")
300
            params.UserIdentityToken.CertificateData = certificate
301
            sig = uacrypto.sign_sha1(self.private_key, certificate)
302
            params.UserTokenSignature = ua.SignatureData()
303
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
304
            params.UserTokenSignature.Signature = sig
305
        else:
306 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
307 1
            params.UserIdentityToken.UserName = username 
308 1
            if self.server_url.password:
309
                pubkey = uacrypto.pubkey_from_dercert(self.server_certificate)
310
                data = uacrypto.encrypt_rsa_oaep(pubkey, bytes(password, "utf8"))
311
                params.UserIdentityToken.Password = data
312 1
            params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.UserName, b"username_basic256")
313 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
314 1
        return self.bclient.activate_session(params)
315
316 1
    def close_session(self):
317
        """
318
        Close session
319
        """
320 1
        if self.keepalive:
321 1
            self.keepalive.stop()
322 1
        return self.bclient.close_session(True)
323
324 1
    def get_root_node(self):
325 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
326
327 1
    def get_objects_node(self):
328 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
329
330 1
    def get_server_node(self):
331 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
332
333 1
    def get_node(self, nodeid):
334
        """
335
        Get node using NodeId object or a string representing a NodeId
336
        """
337 1
        return Node(self.bclient, nodeid)
338
339 1
    def create_subscription(self, period, handler):
340
        """
341
        Create a subscription.
342
        returns a Subscription object which allow
343
        to subscribe to events or data on server
344
        handler argument is a class with data_change and/or event methods.
345
        These methods will be called when notfication from server are received.
346
        See example-client.py.
347
        Do not do expensive/slow or network operation from these methods 
348
        since they are called directly from receiving thread. This is a design choice,
349
        start another thread if you need to do such a thing.
350
        """
351 1
        params = ua.CreateSubscriptionParameters()
352 1
        params.RequestedPublishingInterval = period
353 1
        params.RequestedLifetimeCount = 3000
354 1
        params.RequestedMaxKeepAliveCount = 10000
355 1
        params.MaxNotificationsPerPublish = 10000
356 1
        params.PublishingEnabled = True
357 1
        params.Priority = 0
358 1
        return Subscription(self.bclient, params, handler)
359
360 1
    def get_namespace_array(self):
361 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
362 1
        return ns_node.get_value()
363
364 1
    def get_namespace_index(self, uri):
365 1
        uries = self.get_namespace_array()
366
        return uries.index(uri)
367