Passed
Branch dev (949ba8)
by Olivier
03:17 queued 01:08
created

opcua.Client.activate_session()   B

Complexity

Conditions 5

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 7.055
Metric Value
dl 0
loc 27
ccs 13
cts 23
cp 0.5652
rs 8.0896
cc 5
crap 7.055
1 1
from __future__ import division  # support for python2
2 1
import os
3 1
from threading import Thread, Condition
4 1
import logging
5 1
try:
6 1
    from urllib.parse import urlparse
7
except ImportError:  # support for python2
8
    from urlparse import urlparse
9
10 1
from opcua import uaprotocol as ua
11 1
from opcua import BinaryClient, Node, Subscription
12 1
from opcua import utils
13 1
from opcua import uacrypto
14
15
16 1
class KeepAlive(Thread):
17
18
    """
19
    Used by Client to keep session opened.
20
    OPCUA defines timeout both for sessions and secure channel
21
    """
22
23 1
    def __init__(self, client, timeout):
24 1
        Thread.__init__(self)
25 1
        self.logger = logging.getLogger(__name__)
26 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
27
            timeout = 360000
28 1
        self.timeout = timeout
29 1
        self.client = client
30 1
        self._dostop = False
31 1
        self._cond = Condition()
32
33 1
    def run(self):
34 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
35 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
36 1
        while not self._dostop:
37 1
            with self._cond:
38 1
                self._cond.wait(self.timeout / 1000)
39 1
            if self._dostop:
40 1
                break
41
            self.logger.debug("renewing channel")
42
            self.client.open_secure_channel(renew=True)
43
            val = server_state.get_value()
44
            self.logger.debug("server state is: %s ", val)
45 1
        self.logger.debug("keepalive thread has stopped")
46
47 1
    def stop(self):
48 1
        self.logger.debug("stoping keepalive thread")
49 1
        with self._cond:
50 1
            self._cond.notify_all()
51 1
        self._dostop = True
52
53
54 1
class Client(object):
55
56
    """
57
    High level client to connect to an OPC-UA server.
58
    This class makes it easy to connect and browse address space.
59
    It attemps to expose as much functionality as possible
60
    but if you want to do to special things you will probably need
61
    to work with the BinaryClient object, available as self.bclient
62
    which offers a raw OPC-UA interface.
63
    """
64
65 1
    def __init__(self, url, timeout=1):
66
        """
67
        used url argument to connect to server.
68
        if you are unsure of url, write at least hostname and port
69
        and call get_endpoints
70
        timeout is the timeout to get an answer for requests to server
71
        public member of this call are available to be set by API users
72
73
        """
74 1
        self.logger = logging.getLogger(__name__)
75 1
        self.server_url = urlparse(url)
76 1
        self.name = "Pure Python Client"
77 1
        self.description = self.name
78 1
        self.application_uri = "urn:freeopcua:client"
79 1
        self.product_uri = "urn:freeopcua.github.no:client"
80 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
81 1
        self.security_mode = ua.MessageSecurityMode.None_
82 1
        self.secure_channel_id = None
83 1
        self.default_timeout = 3600000
84 1
        self.secure_channel_timeout = self.default_timeout
85 1
        self.session_timeout = self.default_timeout
86 1
        self._policy_ids = []
87 1
        self.server_certificate = ""
88 1
        self.client_certificate = ""
89 1
        self.private_key = ""
90 1
        self.bclient = BinaryClient(timeout)
91 1
        self._nonce = None
92 1
        self._session_counter = 1
93 1
        self.keepalive = None
94
95 1
    def load_client_certificate(self, path):
96
        """
97
        load our certificate from file, either pem or der
98
        """
99
        _, ext = os.path.splitext(path)
100
        with open(path, "br") as f:
101
            self.client_certificate = f.read()
102
        if ext == ".pem":
103
            self.client_certificate = uacrypto.dem_to_der(self.client_certificate)
104
105 1
    def load_private_key(self, path):
106
        with open(path, "br") as f:
107
            self.private_key = f.read()
108
109 1
    def get_server_endpoints(self):
110
        """
111
        Connect, ask server for endpoints, and disconnect
112
        """
113
        self.connect_socket()
114
        self.send_hello()
115
        self.open_secure_channel()
116
        endpoints = self.get_endpoints()
117
        self.close_secure_channel()
118
        self.disconnect_socket()
119
        return endpoints
120
121 1
    def find_all_servers(self):
122
        """
123
        Connect, ask server for a list of known servers, and disconnect
124
        """
125
        self.connect_socket()
126
        self.send_hello()
127
        self.open_secure_channel()
128
        servers = self.find_servers()
129
        self.close_secure_channel()
130
        self.disconnect_socket()
131
        return servers
132
133 1
    def connect(self):
134
        """
135
        High level method
136
        Connect, create and activate session
137
        """
138 1
        self.connect_socket()
139 1
        self.send_hello()
140 1
        self.open_secure_channel()
141 1
        self.create_session()
142 1
        self.activate_session(username=self.server_url.username, password=self.server_url.password, certificate=self.client_certificate)
143
144 1
    def disconnect(self):
145
        """
146
        High level method
147
        Close session, secure channel and socket
148
        """
149 1
        self.close_session()
150 1
        self.close_secure_channel()
151 1
        self.disconnect_socket()
152
153 1
    def connect_socket(self):
154
        """
155
        connect to socket defined in url
156
        """
157 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
158
159 1
    def disconnect_socket(self):
160 1
        self.bclient.disconnect_socket()
161
162 1
    def send_hello(self):
163
        """
164
        Send OPC-UA hello to server
165
        """
166 1
        ack = self.bclient.send_hello(self.server_url.geturl())
167
        # FIXME check ack
168
169 1
    def open_secure_channel(self, renew=False):
170
        """
171
        Open secure channel, if renew is True, renew channel
172
        """
173 1
        params = ua.OpenSecureChannelParameters()
174 1
        params.ClientProtocolVersion = 0
175 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
176 1
        if renew:
177
            params.RequestType = ua.SecurityTokenRequestType.Renew
178 1
        params.SecurityMode = self.security_mode
179 1
        params.RequestedLifetime = self.secure_channel_timeout
180 1
        params.ClientNonce = '\x00'
181 1
        result = self.bclient.open_secure_channel(params)
182 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
183
184 1
    def close_secure_channel(self):
185 1
        return self.bclient.close_secure_channel()
186
187 1
    def get_endpoints(self):
188 1
        params = ua.GetEndpointsParameters()
189 1
        params.EndpointUrl = self.server_url.geturl()
190 1
        return self.bclient.get_endpoints(params)
191
192 1
    def find_servers(self):
193
        params = ua.FindServersParameters()
194
        return self.bclient.find_servers(params)
195
196 1
    def create_session(self):
197 1
        desc = ua.ApplicationDescription()
198 1
        desc.ApplicationUri = self.application_uri
199 1
        desc.ProductUri = self.product_uri
200 1
        desc.ApplicationName = ua.LocalizedText(self.name)
201 1
        desc.ApplicationType = ua.ApplicationType.Client
202
203 1
        params = ua.CreateSessionParameters()
204 1
        params.ClientNonce = utils.create_nonce()
205 1
        params.ClientCertificate = b''
206 1
        params.ClientDescription = desc
207 1
        params.EndpointUrl = self.server_url.geturl()
208 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
209 1
        params.RequestedSessionTimeout = 3600000
210 1
        params.MaxResponseMessageSize = 0  # means no max size
211 1
        params.ClientCertificate = self.client_certificate
212 1
        response = self.bclient.create_session(params)
213 1
        self.server_certificate = response.ServerCertificate
214 1
        for ep in response.ServerEndpoints:
215 1
            if ep.SecurityMode == self.security_mode:
216
                # remember PolicyId's: we will use them in activate_session()
217 1
                self._policy_ids = ep.UserIdentityTokens
218 1
        self.session_timeout = response.RevisedSessionTimeout
219 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
220 1
        self.keepalive.start()
221 1
        return response
222
223 1
    def activate_session(self, username=None, password=None, certificate=None):
224
        """
225
        Activate session using either username and password or private_key
226
        """
227 1
        params = ua.ActivateSessionParameters()
228 1
        params.LocaleIds.append("en")
229 1
        if not username and not certificate:
230 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
231 1
            params.UserIdentityToken.PolicyId = b"anonymous" 
232 1
        elif certificate:
233
            params.UserIdentityToken = ua.X509IdentityToken()
234
            params.UserIdentityToken.PolicyId = b"certificate_basic256"  
235
            params.UserIdentityToken.CertificateData = certificate
236
            sig = uacrypto.sign_sha1(self.private_key, certificate)
237
            params.UserTokenSignature = ua.SignatureData()
238
            params.UserTokenSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
239
            params.UserTokenSignature.Signature = sig
240
        else:
241 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
242 1
            params.UserIdentityToken.UserName = username 
243 1
            if self.server_url.password:
244
                pubkey = uacrypto.pubkey_from_dercert(self.server_certificate)
245
                data = uacrypto.encrypt_rsa_oaep(pubkey, bytes(password, "utf8"))
246
                params.UserIdentityToken.Password = data
247 1
            params.UserIdentityToken.PolicyId = b"username_basic256" 
248 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
249 1
        return self.bclient.activate_session(params)
250
251 1
    def close_session(self):
252
        """
253
        Close session
254
        """
255 1
        if self.keepalive:
256 1
            self.keepalive.stop()
257 1
        return self.bclient.close_session(True)
258
259 1
    def get_root_node(self):
260 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
261
262 1
    def get_objects_node(self):
263 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
264
265 1
    def get_server_node(self):
266 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
267
268 1
    def get_node(self, nodeid):
269
        """
270
        Get node using NodeId object or a string representing a NodeId
271
        """
272 1
        return Node(self.bclient, nodeid)
273
274 1
    def create_subscription(self, period, handler):
275
        """
276
        Create a subscription.
277
        returns a Subscription object which allow
278
        to subscribe to events or data on server
279
        handler argument is a class with data_change and/or event methods.
280
        These methods will be called when notfication from server are received.
281
        See example-client.py.
282
        Do not do expensive/slow or network operation from these methods 
283
        since they are called directly from receiving thread. This is a design choice,
284
        start another thread if you need to do such a thing.
285
        """
286 1
        params = ua.CreateSubscriptionParameters()
287 1
        params.RequestedPublishingInterval = period
288 1
        params.RequestedLifetimeCount = 3000
289 1
        params.RequestedMaxKeepAliveCount = 10000
290 1
        params.MaxNotificationsPerPublish = 4294967295
291 1
        params.PublishingEnabled = True
292 1
        params.Priority = 0
293 1
        return Subscription(self.bclient, params, handler)
294
295 1
    def get_namespace_array(self):
296 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
297 1
        return ns_node.get_value()
298
299 1
    def get_namespace_index(self, uri):
300 1
        uries = self.get_namespace_array()
301
        return uries.index(uri)
302