Completed
Push — dev ( df6b30...7c3457 )
by Olivier
03:06
created

opcua.Client.connect()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1
Metric Value
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 9.4286
cc 1
crap 1
1 1
from __future__ import division  # support for python2
2 1
from threading import Thread, Condition
3 1
import logging
4 1
try:
5 1
    from urllib.parse import urlparse
6
except ImportError:  # support for python2
7
    from urlparse import urlparse
8
9 1
from opcua import uaprotocol as ua
10 1
from opcua import BinaryClient, Node, Subscription
11 1
from opcua import utils
12
13
14 1
class KeepAlive(Thread):
15
16
    """
17
    Used by Client to keep session opened.
18
    OPCUA defines timeout both for sessions and secure channel
19
    """
20
21 1
    def __init__(self, client, timeout):
22 1
        Thread.__init__(self)
23 1
        self.logger = logging.getLogger(__name__)
24 1
        if timeout == 0:  # means no timeout bu we do not trust such servers
25
            timeout = 360000
26 1
        self.timeout = timeout
27 1
        self.client = client
28 1
        self._dostop = False
29 1
        self._cond = Condition()
30
31 1
    def run(self):
32 1
        self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
33 1
        server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
34 1
        while not self._dostop:
35 1
            with self._cond:
36 1
                self._cond.wait(self.timeout / 1000)
37 1
            if self._dostop:
38 1
                break
39
            self.logger.debug("renewing channel")
40
            self.client.open_secure_channel(renew=True)
41
            val = server_state.get_value()
42
            self.logger.debug("server state is: %s ", val)
43 1
        self.logger.debug("keepalive thread has stopped")
44
45 1
    def stop(self):
46 1
        self.logger.debug("stoping keepalive thread")
47 1
        with self._cond:
48 1
            self._cond.notify_all()
49 1
        self._dostop = True
50
51
52 1
class Client(object):
53
54
    """
55
    High level client to connect to an OPC-UA server.
56
    This class makes it easy to connect and browse address space.
57
    It attemps to expose as much functionality as possible
58
    but if you want to do to special things you will probably need
59
    to work with the BinaryClient object, available as self.bclient
60
    which offers a raw OPC-UA interface.
61
    """
62
63 1
    def __init__(self, url, timeout=1):
64
        """
65
        used url argument to connect to server.
66
        if you are unsure of url, write at least hostname and port
67
        and call get_endpoints
68
        timeout is the timeout to get an answer for requests to server
69
        public member of this call are available to be set by API users
70
71
        """
72 1
        self.logger = logging.getLogger(__name__)
73 1
        self.server_url = urlparse(url)
74 1
        self.name = "Pure Python Client"
75 1
        self.description = self.name
76 1
        self.application_uri = "urn:freeopcua:client"
77 1
        self.product_uri = "urn:freeopcua.github.no:client"
78 1
        self.security_policy_uri = "http://opcfoundation.org/UA/SecurityPolicy#None"
79 1
        self.security_mode = ua.MessageSecurityMode.None_
80 1
        self.secure_channel_id = None
81 1
        self.default_timeout = 3600000
82 1
        self.secure_channel_timeout = self.default_timeout
83 1
        self.session_timeout = self.default_timeout
84 1
        self._policy_ids = []
85 1
        self.server_certificate = ""
86 1
        self.client_certificate = ""
87 1
        self.private_key = ""
88 1
        self.bclient = BinaryClient(timeout)
89 1
        self._nonce = None
90 1
        self._session_counter = 1
91 1
        self.keepalive = None
92
93 1
    def set_client_certificate(self, certificate):
94
        self.client_certificate = certificate
95
96 1
    def set_private_key(self, private_key):
97
        self.private_key = private_key
98
99 1
    def get_server_endpoints(self):
100
        """
101
        Connect, ask server for endpoints, and disconnect
102
        """
103
        self.connect_socket()
104
        self.send_hello()
105
        self.open_secure_channel()
106
        endpoints = self.get_endpoints()
107
        self.close_secure_channel()
108
        self.disconnect_socket()
109
        return endpoints
110
111 1
    def find_all_servers(self):
112
        """
113
        Connect, ask server for a list of known servers, and disconnect
114
        """
115
        self.connect_socket()
116
        self.send_hello()
117
        self.open_secure_channel()
118
        servers = self.find_servers()
119
        self.close_secure_channel()
120
        self.disconnect_socket()
121
        return servers
122
123 1
    def connect(self):
124
        """
125
        High level method
126
        Connect, create and activate session
127
        """
128 1
        self.connect_socket()
129 1
        self.send_hello()
130 1
        self.open_secure_channel()
131 1
        self.create_session()
132 1
        self.activate_session(username=self.server_url.username, private_key=self.private_key)
133
134 1
    def disconnect(self):
135
        """
136
        High level method
137
        Close session, secure channel and socket
138
        """
139 1
        self.close_session()
140 1
        self.close_secure_channel()
141 1
        self.disconnect_socket()
142
143 1
    def connect_socket(self):
144
        """
145
        connect to socket defined in url
146
        """
147 1
        self.bclient.connect_socket(self.server_url.hostname, self.server_url.port)
148
149 1
    def disconnect_socket(self):
150 1
        self.bclient.disconnect_socket()
151
152 1
    def send_hello(self):
153
        """
154
        Send OPC-UA hello to server
155
        """
156 1
        ack = self.bclient.send_hello(self.server_url.geturl())
157
        # FIXME check ack
158
159 1
    def open_secure_channel(self, renew=False):
160
        """
161
        Open secure channel, if renew is True, renew channel
162
        """
163 1
        params = ua.OpenSecureChannelParameters()
164 1
        params.ClientProtocolVersion = 0
165 1
        params.RequestType = ua.SecurityTokenRequestType.Issue
166 1
        if renew:
167
            params.RequestType = ua.SecurityTokenRequestType.Renew
168 1
        params.SecurityMode = self.security_mode
169 1
        params.RequestedLifetime = self.secure_channel_timeout
170 1
        params.ClientNonce = '\x00'
171 1
        result = self.bclient.open_secure_channel(params)
172 1
        self.secure_channel_timeout = result.SecurityToken.RevisedLifetime
173
174 1
    def close_secure_channel(self):
175 1
        return self.bclient.close_secure_channel()
176
177 1
    def get_endpoints(self):
178 1
        params = ua.GetEndpointsParameters()
179 1
        params.EndpointUrl = self.server_url.geturl()
180 1
        return self.bclient.get_endpoints(params)
181
182 1
    def find_servers(self):
183
        params = ua.FindServersParameters()
184
        return self.bclient.find_servers(params)
185
186 1
    def create_session(self):
187 1
        desc = ua.ApplicationDescription()
188 1
        desc.ApplicationUri = self.application_uri
189 1
        desc.ProductUri = self.product_uri
190 1
        desc.ApplicationName = ua.LocalizedText(self.name)
191 1
        desc.ApplicationType = ua.ApplicationType.Client
192
193 1
        params = ua.CreateSessionParameters()
194 1
        params.ClientNonce = utils.create_nonce()
195 1
        params.ClientCertificate = b''
196 1
        params.ClientDescription = desc
197 1
        params.EndpointUrl = self.server_url.geturl()
198 1
        params.SessionName = self.description + " Session" + str(self._session_counter)
199 1
        params.RequestedSessionTimeout = 3600000
200 1
        params.MaxResponseMessageSize = 0  # means no max size
201 1
        params.ClientCertificate = self.client_certificate
202 1
        response = self.bclient.create_session(params)
203 1
        self.server_certificate = response.ServerCertificate
204 1
        for ep in response.ServerEndpoints:
205 1
            if ep.SecurityMode == self.security_mode:
206
                # remember PolicyId's: we will use them in activate_session()
207 1
                self._policy_ids = ep.UserIdentityTokens
208 1
        self.session_timeout = response.RevisedSessionTimeout
209 1
        self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)  # 0.7 is from spec
210 1
        self.keepalive.start()
211 1
        return response
212
213 1
    def activate_session(self, username=None, password=None, private_key=None):
214
        """
215
        Activate session using either username and password or private_key
216
        """
217 1
        params = ua.ActivateSessionParameters()
218 1
        params.LocaleIds.append("en")
219 1
        if not username and not private_key:
220 1
            params.UserIdentityToken = ua.AnonymousIdentityToken()
221 1
            params.UserIdentityToken.PolicyId = b"anonymous" 
222 1
        elif private_key:
223
            params.UserIdentityToken = ua.X509IdentityToken()
224
            params.UserIdentityToken.PolicyId = b"certificate_basic256"  # FIXME: add support for other types
225
            params.UserIdentityToken.CertificateData = private_key
226
        else:
227 1
            params.UserIdentityToken = ua.UserNameIdentityToken()
228 1
            params.UserIdentityToken.UserName = self.server_url.username 
229 1
            if self.server_url.password:
230
                raise NotImplementedError
231
                #p = bytes(self.server_url.password, "utf8")
232
                #p = self.server_url.password
233
                #from Crypto.Cipher import PKCS1_OAEP
234
                #from Crypto.PublicKey import RSA
235
                #from binascii import a2b_base64
236
                #from Crypto.Util.asn1 import DerSequence
237
                #from IPython import embed
238
                #import ssl
239
                #print("TYPE", type(self.server_certificate))
240
                #pem = self.server_certificate
241
                # Convert from PEM to DER
242
                #lines = str(pem).replace(" ",'').split()
243
                #data = ''.join(lines[1:-1])
244
                #embed()
245
                #3data = bytes(data, "utf8")
246
                #print("DATA", data)
247
                #der = a2b_base64(pem)
248
                #ssl.PEM_HEADER=""
249
                #ssl.PEM_FOOTER=""
250
                #der = ssl.PEM_cert_to_DER_cert(pem)
251
                #print("DER", der)
252
253
                # Extract subjectPublicKeyInfo field from X.509 certificate (see RFC3280)
254
                #cert = DerSequence()
255
                #cert.decode(der)
256
                #tbsCertificate = DerSequence()
257
                #tbsCertificate.decode(cert[0])
258
                #key = tbsCertificate[6]
259
                ##print("KEY2", key)
260
261
262
                #r = RSA.importKey(key)
263
                #cipher = PKCS1_OAEP.new(r)
264
                #ciphertext = cipher.encrypt(p)
265
                #params.UserIdentityToken.Password = ciphertext 
266
                #print("KKK", self.policy_ids[ua.UserTokenType.UserName])
267 1
            params.UserIdentityToken.PolicyId = b"user_name" 
268 1
            params.UserIdentityToken.EncryptionAlgorithm = 'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
269 1
        return self.bclient.activate_session(params)
270
271 1
    def close_session(self):
272
        """
273
        Close session
274
        """
275 1
        if self.keepalive:
276 1
            self.keepalive.stop()
277 1
        return self.bclient.close_session(True)
278
279 1
    def get_root_node(self):
280 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
281
282 1
    def get_objects_node(self):
283 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
284
285 1
    def get_server_node(self):
286 1
        return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
287
288 1
    def get_node(self, nodeid):
289
        """
290
        Get node using NodeId object or a string representing a NodeId
291
        """
292 1
        return Node(self.bclient, nodeid)
293
294 1
    def create_subscription(self, period, handler):
295
        """
296
        Create a subscription.
297
        returns a Subscription object which allow
298
        to subscribe to events or data on server
299
        handler argument is a class with data_change and/or event methods.
300
        These methods will be called when notfication from server are received.
301
        See example-client.py.
302
        Do not do expensive/slow or network operation from these methods 
303
        since they are called directly from receiving thread. This is a design choice,
304
        start another thread if you need to do such a thing.
305
        """
306 1
        params = ua.CreateSubscriptionParameters()
307 1
        params.RequestedPublishingInterval = period
308 1
        params.RequestedLifetimeCount = 3000
309 1
        params.RequestedMaxKeepAliveCount = 10000
310 1
        params.MaxNotificationsPerPublish = 4294967295
311 1
        params.PublishingEnabled = True
312 1
        params.Priority = 0
313 1
        return Subscription(self.bclient, params, handler)
314
315 1
    def get_namespace_array(self):
316 1
        ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
317 1
        return ns_node.get_value()
318
319 1
    def get_namespace_index(self, uri):
320 1
        uries = self.get_namespace_array()
321
        return uries.index(uri)
322