Completed
Push — master ( 50d717...c5c8fe )
by Olivier
03:32
created

UaClient.create_monitored_items()   A

Complexity

Conditions 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 9
ccs 9
cts 9
cp 1
crap 1
rs 9.6666
1
"""
2
Low level binary client
3
"""
4
5 1
import logging
6 1
import socket
7 1
from threading import Thread, Lock
8 1
from concurrent.futures import Future
9 1
from functools import partial
10
11 1
from opcua import ua
12 1
from opcua.common import utils
13
14
15 1
class UASocketClient(object):
16
    """
17
    handle socket connection and send ua messages
18
    timeout is the timeout used while waiting for an ua answer from server
19
    """
20 1
    def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
21 1
        self.logger = logging.getLogger(__name__ + ".Socket")
22 1
        self._thread = None
23 1
        self._lock = Lock()
24 1
        self.timeout = timeout
25 1
        self._socket = None
26 1
        self._do_stop = False
27 1
        self.authentication_token = ua.NodeId()
28 1
        self._request_id = 0
29 1
        self._request_handle = 0
30 1
        self._callbackmap = {}
31 1
        self._connection = ua.SecureConnection(security_policy)
32
33 1
    def start(self):
34
        """
35
        Start receiving thread.
36
        this is called automatically in connect and
37
        should not be necessary to call directly
38
        """
39 1
        self._thread = Thread(target=self._run)
40 1
        self._thread.start()
41
42 1
    def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
43
        """
44
        send request to server, lower-level method
45
        timeout is the timeout written in ua header
46
        returns future
47
        """
48 1
        with self._lock:
49 1
            request.RequestHeader = self._create_request_header(timeout)
50 1
            self.logger.debug("Sending: %s", request)
51 1
            try:
52 1
                binreq = request.to_binary()
53
            except:
54
                # reset reqeust handle if any error
55
                # see self._create_request_header
56
                self._request_handle -= 1
57
                raise
58 1
            self._request_id += 1
59 1
            future = Future()
60 1
            if callback:
61 1
                future.add_done_callback(callback)
62 1
            self._callbackmap[self._request_id] = future
63 1
            msg = self._connection.message_to_binary(binreq, message_type, self._request_id)
64 1
            self._socket.write(msg)
65 1
        return future
66
67 1
    def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
68
        """
69
        send request to server.
70
        timeout is the timeout written in ua header
71
        returns response object if no callback is provided
72
        """
73 1
        future = self._send_request(request, callback, timeout, message_type)
74 1
        if not callback:
75 1
            data = future.result(self.timeout)
76 1
            self.check_answer(data, " in response to " + request.__class__.__name__)
77 1
            return data
78
79 1
    def check_answer(self, data, context):
80 1
        data = data.copy()
81 1
        typeid = ua.NodeId.from_binary(data)
82 1
        if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
83 1
            self.logger.warning("ServiceFault from server received %s", context)
84 1
            hdr = ua.ResponseHeader.from_binary(data)
85 1
            hdr.ServiceResult.check()
86
            return False
87 1
        return True
88
89 1
    def _run(self):
90 1
        self.logger.info("Thread started")
91 1
        while not self._do_stop:
92 1
            try:
93 1
                self._receive()
94 1
            except ua.utils.SocketClosedException:
95 1
                self.logger.info("Socket has closed connection")
96 1
                break
97 1
        self.logger.info("Thread ended")
98
99 1
    def _receive(self):
100 1
        msg = self._connection.receive_from_socket(self._socket)
101 1
        if msg is None:
102
            return
103 1
        elif isinstance(msg, ua.Message):
104 1
            self._call_callback(msg.request_id(), msg.body())
105 1
        elif isinstance(msg, ua.Acknowledge):
106 1
            self._call_callback(0, msg)
107
        elif isinstance(msg, ua.ErrorMessage):
108
            self.logger.warning("Received an error: %s", msg)
109
        else:
110
            raise ua.UaError("Unsupported message type: %s", msg)
111
112 1
    def _call_callback(self, request_id, body):
113 1
        with self._lock:
114 1
            future = self._callbackmap.pop(request_id, None)
115 1
            if future is None:
116
                raise ua.UaError("No future object found for request: {}, callbacks in list are {}".format(request_id, self._callbackmap.keys()))
117 1
        future.set_result(body)
118
119 1
    def _create_request_header(self, timeout=1000):
120 1
        hdr = ua.RequestHeader()
121 1
        hdr.AuthenticationToken = self.authentication_token
122 1
        self._request_handle += 1
123 1
        hdr.RequestHandle = self._request_handle
124 1
        hdr.TimeoutHint = timeout
125 1
        return hdr
126
127 1
    def connect_socket(self, host, port):
128
        """
129
        connect to server socket and start receiving thread
130
        """
131 1
        self.logger.info("opening connection")
132 1
        sock = socket.create_connection((host, port))
133 1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # nodelay ncessary to avoid packing in one frame, some servers do not like it
134 1
        self._socket = utils.SocketWrapper(sock)
135 1
        self.start()
136
137 1
    def disconnect_socket(self):
138 1
        self.logger.info("stop request")
139 1
        self._do_stop = True
140 1
        self._socket.socket.shutdown(socket.SHUT_WR)
141 1
        self._socket.socket.close()
142
143 1
    def send_hello(self, url):
144 1
        hello = ua.Hello()
145 1
        hello.EndpointUrl = url
146 1
        future = Future()
147 1
        with self._lock:
148 1
            self._callbackmap[0] = future
149 1
        binmsg = self._connection.tcp_to_binary(ua.MessageType.Hello, hello)
150 1
        self._socket.write(binmsg)
151 1
        ack = future.result(self.timeout)
152 1
        return ack
153
154 1
    def open_secure_channel(self, params):
155 1
        self.logger.info("open_secure_channel")
156 1
        request = ua.OpenSecureChannelRequest()
157 1
        request.Parameters = params
158 1
        future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
159
160 1
        response = ua.OpenSecureChannelResponse.from_binary(future.result(self.timeout))
161 1
        response.ResponseHeader.ServiceResult.check()
162 1
        self._connection.set_security_token(response.Parameters.SecurityToken)
163 1
        return response.Parameters
164
165 1
    def close_secure_channel(self):
166
        """
167
        close secure channel. It seems to trigger a shutdown of socket
168
        in most servers, so be prepare to reconnect.
169
        OPC UA specs Part 6, 7.1.4 say that Server does not send a CloseSecureChannel response and should just close socket
170
        """
171 1
        self.logger.info("close_secure_channel")
172 1
        request = ua.CloseSecureChannelRequest()
173 1
        future = self._send_request(request, message_type=ua.MessageType.SecureClose)
174 1
        with self._lock:
175
            # don't expect any more answers
176 1
            future.cancel()
177 1
            self._callbackmap.clear()
178
179
        # some servers send a response here, most do not ... so we ignore
180
181
182 1
class UaClient(object):
183
184
    """
185
    low level OPC-UA client.
186
187
    It implements (almost) all methods defined in opcua spec
188
    taking in argument the structures defined in opcua spec.
189
190
    In this Python implementation  most of the structures are defined in
191
    uaprotocol_auto.py and uaprotocol_hand.py available under opcua.ua
192
    """
193
194 1
    def __init__(self, timeout=1):
195 1
        self.logger = logging.getLogger(__name__)
196
        # _publishcallbacks should be accessed in recv thread only
197 1
        self._publishcallbacks = {}
198 1
        self._timeout = timeout
199 1
        self._uasocket = None
200 1
        self._security_policy = ua.SecurityPolicy()
201
202 1
    def set_security(self, policy):
203 1
        self._security_policy = policy
204
205 1
    def connect_socket(self, host, port):
206
        """
207
        connect to server socket and start receiving thread
208
        """
209 1
        self._uasocket = UASocketClient(self._timeout, security_policy=self._security_policy)
210 1
        return self._uasocket.connect_socket(host, port)
211
212 1
    def disconnect_socket(self):
213 1
        return self._uasocket.disconnect_socket()
214
215 1
    def send_hello(self, url):
216 1
        return self._uasocket.send_hello(url)
217
218 1
    def open_secure_channel(self, params):
219 1
        return self._uasocket.open_secure_channel(params)
220
221 1
    def close_secure_channel(self):
222
        """
223
        close secure channel. It seems to trigger a shutdown of socket
224
        in most servers, so be prepare to reconnect
225
        """
226 1
        return self._uasocket.close_secure_channel()
227
228 1
    def create_session(self, parameters):
229 1
        self.logger.info("create_session")
230 1
        request = ua.CreateSessionRequest()
231 1
        request.Parameters = parameters
232 1
        data = self._uasocket.send_request(request)
233 1
        response = ua.CreateSessionResponse.from_binary(data)
234 1
        self.logger.debug(response)
235 1
        response.ResponseHeader.ServiceResult.check()
236 1
        self._uasocket.authentication_token = response.Parameters.AuthenticationToken
237 1
        return response.Parameters
238
239 1
    def activate_session(self, parameters):
240 1
        self.logger.info("activate_session")
241 1
        request = ua.ActivateSessionRequest()
242 1
        request.Parameters = parameters
243 1
        data = self._uasocket.send_request(request)
244 1
        response = ua.ActivateSessionResponse.from_binary(data)
245 1
        self.logger.debug(response)
246 1
        response.ResponseHeader.ServiceResult.check()
247 1
        return response.Parameters
248
249 1
    def close_session(self, deletesubscriptions):
250 1
        self.logger.info("close_session")
251 1
        request = ua.CloseSessionRequest()
252 1
        request.DeleteSubscriptions = deletesubscriptions
253 1
        data = self._uasocket.send_request(request)
254 1
        ua.CloseSessionResponse.from_binary(data)
255
        # response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
256
257 1
    def browse(self, parameters):
258 1
        self.logger.info("browse")
259 1
        request = ua.BrowseRequest()
260 1
        request.Parameters = parameters
261 1
        data = self._uasocket.send_request(request)
262 1
        response = ua.BrowseResponse.from_binary(data)
263 1
        self.logger.debug(response)
264 1
        response.ResponseHeader.ServiceResult.check()
265 1
        return response.Results
266
267 1
    def read(self, parameters):
268 1
        self.logger.info("read")
269 1
        request = ua.ReadRequest()
270 1
        request.Parameters = parameters
271 1
        data = self._uasocket.send_request(request)
272 1
        response = ua.ReadResponse.from_binary(data)
273 1
        self.logger.debug(response)
274 1
        response.ResponseHeader.ServiceResult.check()
275
        # cast to Enum attributes that need to
276 1
        for idx, rv in enumerate(parameters.NodesToRead):
277 1
            if rv.AttributeId == ua.AttributeIds.NodeClass:
278
                dv = response.Results[idx]
279
                if dv.StatusCode.is_good():
280
                    dv.Value.Value = ua.NodeClass(dv.Value.Value)
281 1
            elif rv.AttributeId == ua.AttributeIds.ValueRank:
282 1
                dv = response.Results[idx]
283 1
                if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
284 1
                    dv.Value.Value = ua.ValueRank(dv.Value.Value)
285 1
        return response.Results
286
287 1
    def write(self, params):
288 1
        self.logger.info("read")
289 1
        request = ua.WriteRequest()
290 1
        request.Parameters = params
291 1
        data = self._uasocket.send_request(request)
292 1
        response = ua.WriteResponse.from_binary(data)
293 1
        self.logger.debug(response)
294 1
        response.ResponseHeader.ServiceResult.check()
295 1
        return response.Results
296
297 1
    def get_endpoints(self, params):
298 1
        self.logger.info("get_endpoint")
299 1
        request = ua.GetEndpointsRequest()
300 1
        request.Parameters = params
301 1
        data = self._uasocket.send_request(request)
302 1
        response = ua.GetEndpointsResponse.from_binary(data)
303 1
        self.logger.debug(response)
304 1
        response.ResponseHeader.ServiceResult.check()
305 1
        return response.Endpoints
306
307 1
    def find_servers(self, params):
308 1
        self.logger.info("find_servers")
309 1
        request = ua.FindServersRequest()
310 1
        request.Parameters = params
311 1
        data = self._uasocket.send_request(request)
312 1
        response = ua.FindServersResponse.from_binary(data)
313 1
        self.logger.debug(response)
314 1
        response.ResponseHeader.ServiceResult.check()
315 1
        return response.Servers
316
317 1
    def find_servers_on_network(self, params):
318
        self.logger.info("find_servers_on_network")
319
        request = ua.FindServersOnNetworkRequest()
320
        request.Parameters = params
321
        data = self._uasocket.send_request(request)
322
        response = ua.FindServersOnNetworkResponse.from_binary(data)
323
        self.logger.debug(response)
324
        response.ResponseHeader.ServiceResult.check()
325
        return response.Parameters
326
327 1
    def register_server(self, registered_server):
328 1
        self.logger.info("register_server")
329 1
        request = ua.RegisterServerRequest()
330 1
        request.Server = registered_server
331 1
        data = self._uasocket.send_request(request)
332 1
        response = ua.RegisterServerResponse.from_binary(data)
333 1
        self.logger.debug(response)
334 1
        response.ResponseHeader.ServiceResult.check()
335
        # nothing to return for this service
336
337 1
    def register_server2(self, params):
338
        self.logger.info("register_server2")
339
        request = ua.RegisterServer2Request()
340
        request.Parameters = params
341
        data = self._uasocket.send_request(request)
342
        response = ua.RegisterServer2Response.from_binary(data)
343
        self.logger.debug(response)
344
        response.ResponseHeader.ServiceResult.check()
345
        return response.ConfigurationResults
346
347 1
    def translate_browsepaths_to_nodeids(self, browsepaths):
348 1
        self.logger.info("translate_browsepath_to_nodeid")
349 1
        request = ua.TranslateBrowsePathsToNodeIdsRequest()
350 1
        request.Parameters.BrowsePaths = browsepaths
351 1
        data = self._uasocket.send_request(request)
352 1
        response = ua.TranslateBrowsePathsToNodeIdsResponse.from_binary(data)
353 1
        self.logger.debug(response)
354 1
        response.ResponseHeader.ServiceResult.check()
355 1
        return response.Results
356
357 1
    def create_subscription(self, params, callback):
358 1
        self.logger.info("create_subscription")
359 1
        request = ua.CreateSubscriptionRequest()
360 1
        request.Parameters = params
361 1
        resp_fut = Future()
362 1
        mycallbak = partial(self._create_subscription_callback, callback, resp_fut)
363 1
        self._uasocket.send_request(request, mycallbak)
364 1
        return resp_fut.result(self._timeout)
365
366 1
    def _create_subscription_callback(self, pub_callback, resp_fut, data_fut):
367 1
        self.logger.info("_create_subscription_callback")
368 1
        data = data_fut.result()
369 1
        response = ua.CreateSubscriptionResponse.from_binary(data)
370 1
        self.logger.debug(response)
371 1
        response.ResponseHeader.ServiceResult.check()
372 1
        self._publishcallbacks[response.Parameters.SubscriptionId] = pub_callback
373 1
        resp_fut.set_result(response.Parameters)
374
375 1
    def delete_subscriptions(self, subscriptionids):
376 1
        self.logger.info("delete_subscription")
377 1
        request = ua.DeleteSubscriptionsRequest()
378 1
        request.Parameters.SubscriptionIds = subscriptionids
379 1
        resp_fut = Future()
380 1
        mycallbak = partial(self._delete_subscriptions_callback, subscriptionids, resp_fut)
381 1
        self._uasocket.send_request(request, mycallbak)
382 1
        return resp_fut.result(self._timeout)
383
384 1
    def _delete_subscriptions_callback(self, subscriptionids, resp_fut, data_fut):
385 1
        self.logger.info("_delete_subscriptions_callback")
386 1
        data = data_fut.result()
387 1
        response = ua.DeleteSubscriptionsResponse.from_binary(data)
388 1
        self.logger.debug(response)
389 1
        response.ResponseHeader.ServiceResult.check()
390 1
        for sid in subscriptionids:
391 1
            self._publishcallbacks.pop(sid)
392 1
        resp_fut.set_result(response.Results)
393
394 1
    def publish(self, acks=None):
395 1
        self.logger.info("publish")
396 1
        if acks is None:
397 1
            acks = []
398 1
        request = ua.PublishRequest()
399 1
        request.Parameters.SubscriptionAcknowledgements = acks
400 1
        self._uasocket.send_request(request, self._call_publish_callback, timeout=int(9e8))  # timeout could be set to 0 but some servers to not support it
401
402 1
    def _call_publish_callback(self, future):
403 1
        self.logger.info("call_publish_callback")
404 1
        data = future.result()
405 1
        self._uasocket.check_answer(data, "ServiceFault received from server while waiting for publish response")
406 1
        try:
407 1
            response = ua.PublishResponse.from_binary(data)
408 1
            self.logger.debug(response)
409
        except Exception:
410
            self.logger.exception("Error parsing notificatipn from server")
411
            self.publish([]) #send publish request ot server so he does stop sending notifications
412
            return
413 1
        if response.Parameters.SubscriptionId not in self._publishcallbacks:
414
            self.logger.warning("Received data for unknown subscription: %s ", response.Parameters.SubscriptionId)
415
            return
416 1
        callback = self._publishcallbacks[response.Parameters.SubscriptionId]
417 1
        try:
418 1
            callback(response.Parameters)
419
        except Exception:  # we call client code, catch everything!
420
            self.logger.exception("Exception while calling user callback: %s")
421
422 1
    def create_monitored_items(self, params):
423 1
        self.logger.info("create_monitored_items")
424 1
        request = ua.CreateMonitoredItemsRequest()
425 1
        request.Parameters = params
426 1
        data = self._uasocket.send_request(request)
427 1
        response = ua.CreateMonitoredItemsResponse.from_binary(data)
428 1
        self.logger.debug(response)
429 1
        response.ResponseHeader.ServiceResult.check()
430 1
        return response.Results
431
432 1
    def delete_monitored_items(self, params):
433 1
        self.logger.info("delete_monitored_items")
434 1
        request = ua.DeleteMonitoredItemsRequest()
435 1
        request.Parameters = params
436 1
        data = self._uasocket.send_request(request)
437 1
        response = ua.DeleteMonitoredItemsResponse.from_binary(data)
438 1
        self.logger.debug(response)
439 1
        response.ResponseHeader.ServiceResult.check()
440 1
        return response.Results
441
442 1
    def add_nodes(self, nodestoadd):
443 1
        self.logger.info("add_nodes")
444 1
        request = ua.AddNodesRequest()
445 1
        request.Parameters.NodesToAdd = nodestoadd
446 1
        data = self._uasocket.send_request(request)
447 1
        response = ua.AddNodesResponse.from_binary(data)
448 1
        self.logger.debug(response)
449 1
        response.ResponseHeader.ServiceResult.check()
450 1
        return response.Results
451
452 1
    def delete_nodes(self, nodestodelete):
453 1
        self.logger.info("delete_nodes")
454 1
        request = ua.DeleteNodesRequest()
455 1
        request.Parameters.NodesToDelete = nodestodelete
456 1
        data = self._uasocket.send_request(request)
457 1
        response = ua.DeleteNodesResponse.from_binary(data)
458 1
        self.logger.debug(response)
459 1
        response.ResponseHeader.ServiceResult.check()
460 1
        return response.Results
461
462 1
    def call(self, methodstocall):
463 1
        request = ua.CallRequest()
464 1
        request.Parameters.MethodsToCall = methodstocall
465 1
        data = self._uasocket.send_request(request)
466 1
        response = ua.CallResponse.from_binary(data)
467 1
        self.logger.debug(response)
468 1
        response.ResponseHeader.ServiceResult.check()
469 1
        return response.Results
470
471 1
    def history_read(self, params):
472
        self.logger.info("history_read")
473
        request = ua.HistoryReadRequest()
474
        request.Parameters = params
475
        data = self._uasocket.send_request(request)
476
        response = ua.HistoryReadResponse.from_binary(data)
477
        self.logger.debug(response)
478
        response.ResponseHeader.ServiceResult.check()
479
        return response.Results
480
481
    def modify_monitored_items(self, params):
482
        self.logger.info("modify_monitored_items")
483
        request = ua.ModifyMonitoredItemsRequest()
484
        request.Parameters = params
485
        data = self._uasocket.send_request(request)
486
        response = ua.ModifyMonitoredItemsResponse.from_binary(data)
487
        self.logger.debug(response)
488
        response.ResponseHeader.ServiceResult.check()
489
        return response.Results
490