Completed
Push — dev ( 7c3457...b4ea62 )
by Olivier
04:07
created

opcua.Client.open_secure_channel()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.003
Metric Value
dl 0
loc 14
ccs 10
cts 11
cp 0.9091
rs 9.4286
cc 2
crap 2.003
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import uaprotocol as ua
10 1
from opcua import BinaryClient, Node, Subscription
11 1
from opcua import utils
12
13
14 1
class KeepAlive(Thread):
15
16
    """
17
    Used by Client to keep session opened.
18
    OPCUA defines timeout both for sessions and secure channel
19
    """
20
21 1
    def __init__(self, client, timeout):
22 1
        Thread.__init__(self)
23 1
        self.logger = logging.getLogger(__name__)
24 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
25
            timeout = 360000
26 1
        self.timeout = timeout
27 1
        self.client = client
28 1
        self._dostop = False
29 1
        self._cond = Condition()
30
31 1
    def run(self):
32 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
33 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
34 1
        while not self._dostop:
35 1
            with self._cond:
36 1
                self._cond.wait(self.timeout / 1000)
37 1
            if self._dostop:
38 1
                break
39
            self.logger.debug("renewing channel")
40
            self.client.open_secure_channel(renew=True)
41
            val = server_state.get_value()
42
            self.logger.debug("server state is: %s ", val)
43 1
        self.logger.debug("keepalive thread has stopped")
44
45 1
    def stop(self):
46 1
        self.logger.debug("stoping keepalive thread")
47 1
        with self._cond:
48 1
            self._cond.notify_all()
49 1
        self._dostop = True
50
51
52 1
class Client(object):
53
54
    """
55
    High level client to connect to an OPC-UA server.
56
    This class makes it easy to connect and browse address space.
57
    It attemps to expose as much functionality as possible
58
    but if you want to do to special things you will probably need
59
    to work with the BinaryClient object, available as self.bclient
60
    which offers a raw OPC-UA interface.
61
62
    """
63
64 1
    def __init__(self, url, timeout=1):
65
        """
66
        used url argument to connect to server.
67
        if you are unsure of url, write at least hostname and port
68
        and call get_endpoints
69
        timeout is the timeout to get an answer for requests to server
70
        """
71 1
        self.logger = logging.getLogger(__name__)
72 1
        self.server_url = urlparse(url)
73 1
        self.name = "Pure Python Client"
74 1
        self.description = self.name
75 1
        self.application_uri = "urn:freeopcua:client"
76 1
        self.product_uri = "urn:freeopcua.github.no:client"
77 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
78 1
        self.security_mode = ua.MessageSecurityMode.None_
79 1
        self.secure_channel_id = None
80 1
        self.default_timeout = 3600000
81 1
        self.secure_channel_timeout = self.default_timeout
82 1
        self.session_timeout = self.default_timeout
83 1
        self.policy_ids = {
84
            ua.UserTokenType.Anonymous: b'anonymous',
85
            ua.UserTokenType.UserName: b'user_name',
86
        }
87 1
        self.server_certificate = None
88 1
        self.bclient = BinaryClient(timeout)
89 1
        self._nonce = None
90 1
        self._session_counter = 1
91 1
        self.keepalive = None
92
93 1
    def get_server_endpoints(self):
94
        """
95
        Connect, ask server for endpoints, and disconnect
96
        """
97
        self.connect_socket()
98
        self.send_hello()
99
        self.open_secure_channel()
100
        endpoints = self.get_endpoints()
101
        self.close_secure_channel()
102
        self.disconnect_socket()
103
        return endpoints
104
105 1
    def find_all_servers(self):
106
        """
107
        Connect, ask server for a list of known servers, and disconnect
108
        """
109
        self.connect_socket()
110
        self.send_hello()
111
        self.open_secure_channel()
112
        servers = self.find_servers()
113
        self.close_secure_channel()
114
        self.disconnect_socket()
115
        return servers
116
117 1
    def connect(self):
118
        """
119
        High level method
120
        Connect, create and activate session
121
        """
122 1
        self.connect_socket()
123 1
        self.send_hello()
124 1
        self.open_secure_channel()
125 1
        self.create_session()
126 1
        self.activate_session()
127
128 1
    def disconnect(self):
129
        """
130
        High level method
131
        Close session, secure channel and socket
132
        """
133 1
        self.close_session()
134 1
        self.close_secure_channel()
135 1
        self.disconnect_socket()
136
137 1
    def connect_socket(self):
138
        """
139
        connect to socket defined in url
140
        """
141 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
142
143 1
    def disconnect_socket(self):
144 1
        self.bclient.disconnect_socket()
145
146 1
    def send_hello(self):
147
        """
148
        Send OPC-UA hello to server
149
        """
150 1
        ack = self.bclient.send_hello(self.server_url.geturl())
151
        # FIXME check ack
152
153 1
    def open_secure_channel(self, renew=False):
154
        """
155
        Open secure channel, if renew is True, renew channel
156
        """
157 1
        params = ua.OpenSecureChannelParameters()
158 1
        params.ClientProtocolVersion = 0
159 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
160 1
        if renew:
161
            params.RequestType = ua.SecurityTokenRequestType.Renew
162 1
        params.SecurityMode = self.security_mode
163 1
        params.RequestedLifetime = self.secure_channel_timeout
164 1
        params.ClientNonce = '\x00'
165 1
        result = self.bclient.open_secure_channel(params)
166 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
167
168 1
    def close_secure_channel(self):
169 1
        return self.bclient.close_secure_channel()
170
171 1
    def get_endpoints(self):
172 1
        params = ua.GetEndpointsParameters()
173 1
        params.EndpointUrl = self.server_url.geturl()
174 1
        return self.bclient.get_endpoints(params)
175
176 1
    def find_servers(self):
177
        params = ua.FindServersParameters()
178
        return self.bclient.find_servers(params)
179
180 1
    def create_session(self):
181 1
        desc = ua.ApplicationDescription()
182 1
        desc.ApplicationUri = self.application_uri
183 1
        desc.ProductUri = self.product_uri
184 1
        desc.ApplicationName = ua.LocalizedText(self.name)
185 1
        desc.ApplicationType = ua.ApplicationType.Client
186
187 1
        params = ua.CreateSessionParameters()
188 1
        params.ClientNonce = utils.create_nonce()
189 1
        params.ClientCertificate = b''
190 1
        params.ClientDescription = desc
191 1
        params.EndpointUrl = self.server_url.geturl()
192 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
193 1
        params.RequestedSessionTimeout = 3600000
194 1
        params.MaxResponseMessageSize = 0  # means no max size
195 1
        response = self.bclient.create_session(params)
196
        #print("Certificate is ", response.ServerCertificate)
197 1
        self.server_certificate = response.ServerCertificate
198 1
        for ep in response.ServerEndpoints:
199 1
            if ep.SecurityMode == self.security_mode:
200
                # remember PolicyId's: we will use them in activate_session()
201 1
                for token in ep.UserIdentityTokens:
202 1
                    self.policy_ids[token.TokenType] = token.PolicyId
203 1
        self.session_timeout = response.RevisedSessionTimeout
204 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
205 1
        self.keepalive.start()
206 1
        return response
207
208 1
    def activate_session(self):
209 1
        params = ua.ActivateSessionParameters()
210 1
        params.LocaleIds.append("en")
211 1
        if not self.server_url.username:
212 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
213 1
            params.UserIdentityToken.PolicyId = self.policy_ids[ua.UserTokenType.Anonymous]
214
        else:
215 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
216 1
            params.UserIdentityToken.UserName = self.server_url.username 
217 1
            if self.server_url.password:
218
                raise NotImplementedError
219
                #p = bytes(self.server_url.password, "utf8")
220
                #p = self.server_url.password
221
                #from Crypto.Cipher import PKCS1_OAEP
222
                #from Crypto.PublicKey import RSA
223
                #from binascii import a2b_base64
224
                #from Crypto.Util.asn1 import DerSequence
225
                #from IPython import embed
226
                #import ssl
227
                #print("TYPE", type(self.server_certificate))
228
                #pem = self.server_certificate
229
                # Convert from PEM to DER
230
                #lines = str(pem).replace(" ",'').split()
231
                #data = ''.join(lines[1:-1])
232
                #embed()
233
                #3data = bytes(data, "utf8")
234
                #print("DATA", data)
235
                #der = a2b_base64(pem)
236
                #ssl.PEM_HEADER=""
237
                #ssl.PEM_FOOTER=""
238
                #der = ssl.PEM_cert_to_DER_cert(pem)
239
                #print("DER", der)
240
241
                # Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
242
                #cert = DerSequence()
243
                #cert.decode(der)
244
                #tbsCertificate = DerSequence()
245
                #tbsCertificate.decode(cert[0])
246
                #key = tbsCertificate[6]
247
                ##print("KEY2", key)
248
249
250
                #r = RSA.importKey(key)
251
                #cipher = PKCS1_OAEP.new(r)
252
                #ciphertext = cipher.encrypt(p)
253
                #params.UserIdentityToken.Password = ciphertext 
254
                #print("KKK", self.policy_ids[ua.UserTokenType.UserName])
255 1
            params.UserIdentityToken.PolicyId = self.policy_ids[ua.UserTokenType.UserName]
256 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
257 1
        return self.bclient.activate_session(params)
258
259 1
    def close_session(self):
260
        """
261
        Close session
262
        """
263 1
        if self.keepalive:
264 1
            self.keepalive.stop()
265 1
        return self.bclient.close_session(True)
266
267 1
    def get_root_node(self):
268 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
269
270 1
    def get_objects_node(self):
271 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
272
273 1
    def get_server_node(self):
274 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
275
276 1
    def get_node(self, nodeid):
277
        """
278
        Get node using NodeId object or a string representing a NodeId
279
        """
280 1
        return Node(self.bclient, nodeid)
281
282 1
    def create_subscription(self, period, handler):
283
        """
284
        Create a subscription.
285
        returns a Subscription object which allow
286
        to subscribe to events or data on server
287
        handler argument is a class with data_change and/or event methods.
288
        These methods will be called when notfication from server are received.
289
        See example-client.py.
290
        Do not do expensive/slow or network operation from these methods 
291
        since they are called directly from receiving thread. This is a design choice,
292
        start another thread if you need to do such a thing.
293
        """
294 1
        params = ua.CreateSubscriptionParameters()
295 1
        params.RequestedPublishingInterval = period
296 1
        params.RequestedLifetimeCount = 3000
297 1
        params.RequestedMaxKeepAliveCount = 10000
298 1
        params.MaxNotificationsPerPublish = 4294967295
299 1
        params.PublishingEnabled = True
300 1
        params.Priority = 0
301 1
        return Subscription(self.bclient, params, handler)
302
303 1
    def get_namespace_array(self):
304 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
305 1
        return ns_node.get_value()
306
307 1
    def get_namespace_index(self, uri):
308 1
        uries = self.get_namespace_array()
309
        return uries.index(uri)
310