Test Failed
Pull Request — master (#620)
by
unknown
07:46
created

UaProcessor.send_response()   A

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 7
ccs 5
cts 5
cp 1
crap 2
rs 9.4285
1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
import time
5
6 1
from opcua import ua
7 1
from opcua.ua.ua_binary import nodeid_from_binary, struct_from_binary
8 1
from opcua.ua.ua_binary import struct_to_binary, uatcp_to_binary
9 1
from opcua.common import utils
10 1
from opcua.common.connection import SecureConnection
11
12
13 1
class PublishRequestData(object):
14
15 1
    def __init__(self):
16 1
        self.requesthdr = None
17 1
        self.algohdr = None
18 1
        self.seqhdr = None
19 1
        self.timestamp = time.time()
20
21
22 1
class UaProcessor(object):
23
24 1
    def __init__(self, internal_server, socket):
25 1
        self.logger = logging.getLogger(__name__)
26 1
        self.iserver = internal_server
27 1
        self.name = socket.get_extra_info('peername')
28 1
        self.sockname = socket.get_extra_info('sockname')
29 1
        self.session = None
30 1
        self.socket = socket
31 1
        self._socketlock = Lock()
32 1
        self._datalock = RLock()
33 1
        self._publishdata_queue = []
34 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
35 1
        self._connection = SecureConnection(ua.SecurityPolicy())
36
37 1
    def set_policies(self, policies):
38 1
        self._connection.set_policy_factories(policies)
39
40 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
41 1
        with self._socketlock:
42 1
            response.ResponseHeader.RequestHandle = requesthandle
43 1
            data = self._connection.message_to_binary(
44
                struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
45
46 1
            self.socket.write(data)
47
48 1
    def open_secure_channel(self, algohdr, seqhdr, body):
49 1
        request = struct_from_binary(ua.OpenSecureChannelRequest, body)
50
51 1
        self._connection.select_policy(
52
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
53
54 1
        channel = self._connection.open(request.Parameters, self.iserver)
55
        # send response
56 1
        response = ua.OpenSecureChannelResponse()
57 1
        response.Parameters = channel
58 1
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
59
60 1
    def forward_publish_response(self, result):
61 1
        self.logger.info("forward publish response %s", result)
62 1
        with self._datalock:
63 1
            while True:
64 1
                if len(self._publishdata_queue) == 0:
65
                    self._publish_result_queue.append(result)
66
                    self.logger.info("Server wants to send publish answer but no publish request is available,"
67
                                     "enqueing notification, length of result queue is %s",
68
                                     len(self._publish_result_queue))
69
                    return
70 1
                requestdata = self._publishdata_queue.pop(0)
71 1
                if requestdata.requesthdr.TimeoutHint == 0 or requestdata.requesthdr.TimeoutHint != 0 and time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
72 1
                    break
73
74 1
        response = ua.PublishResponse()
75 1
        response.Parameters = result
76
77 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
78
79 1
    def process(self, header, body):
80 1
        msg = self._connection.receive_from_header_and_body(header, body)
81 1
        if isinstance(msg, ua.Message):
82 1
            if header.MessageType == ua.MessageType.SecureOpen:
83 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
84
85 1
            elif header.MessageType == ua.MessageType.SecureClose:
86 1
                self._connection.close()
87 1
                return False
88
89 1
            elif header.MessageType == ua.MessageType.SecureMessage:
90 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
91 1
        elif isinstance(msg, ua.Hello):
92 1
            ack = ua.Acknowledge()
93 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
94 1
            ack.SendBufferSize = msg.SendBufferSize
95 1
            data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
96 1
            self.socket.write(data)
97
        elif isinstance(msg, ua.ErrorMessage):
98
            self.logger.warning("Received an error message type")
99
        elif msg is None:
100
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
101
        else:
102
            self.logger.warning("Unsupported message type: %s", header.MessageType)
103
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
104 1
        return True
105
106 1
    def process_message(self, algohdr, seqhdr, body):
107 1
        typeid = nodeid_from_binary(body)
108 1
        requesthdr = struct_from_binary(ua.RequestHeader, body)
109 1
        try:
110 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
111 1
        except utils.ServiceError as e:
112 1
            status = ua.StatusCode(e.code)
113 1
            response = ua.ServiceFault()
114 1
            response.ResponseHeader.ServiceResult = status
115 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
116 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
117 1
            return True
118
119 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
120 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
121 1
            self.logger.info("Create session request")
122 1
            params = struct_from_binary(ua.CreateSessionParameters, body)
123
124
            # create the session on server
125 1
            self.session = self.iserver.create_session(self.name, external=True)
126
            # get a session creation result to send back
127 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)
128
129 1
            response = ua.CreateSessionResponse()
130 1
            response.Parameters = sessiondata
131 1
            response.Parameters.ServerCertificate = self._connection.security_policy.client_certificate
132 1
            if self._connection.security_policy.server_certificate is None:
133 1
                data = params.ClientNonce
134
            else:
135 1
                data = self._connection.security_policy.server_certificate + params.ClientNonce
136 1
            response.Parameters.ServerSignature.Signature = \
137
                self._connection.security_policy.asymmetric_cryptography.signature(data)
138
139 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
140
141 1
            self.logger.info("sending create session response")
142 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
143
144 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
145 1
            self.logger.info("Close session request")
146 1
            deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
147
148 1
            self.session.close_session(deletesubs)
149
150 1
            response = ua.CloseSessionResponse()
151 1
            self.logger.info("sending close session response")
152 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
153
154 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
155 1
            self.logger.info("Activate session request")
156 1
            params = struct_from_binary(ua.ActivateSessionParameters, body)
157
158 1
            if not self.session:
159
                self.logger.info("request to activate non-existing session")
160
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
161
162 1
            if self._connection.security_policy.client_certificate is None:
163 1
                data = self.session.nonce
164
            else:
165 1
                data = self._connection.security_policy.client_certificate + self.session.nonce
166 1
            self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
167
168 1
            #
169
            # somewhere create a list of trusted certificates
170 1
            #
171 1
            from opcua.crypto import uacrypto
172
            trusted_certificates = [uacrypto.load_certificate('certificate-example.der')]
173 1
174 1
            #
175
            # To be implmeneted in the asymmetric_cryptography.verify (just called function above)
176 1
            #
177 1
            from opcua.crypto import uacrypto
178 1
            client_cert = params.UserIdentityToken.CertificateData
179
            sig1 = params.ClientSignature.Signature
180 1
            sig2 = params.UserTokenSignature.Signature
181
            verification_passed = False
182 1
183 1
            '''
184
            # test
185 1
            cert1 = uacrypto.load_certificate('certificate-example.der')
186 1
            cert2 = uacrypto.load_certificate('cert.pem')
187
            import pdb
188 1
            pdb.set_trace()
189 1
            '''
190 1
191
            for cert in trusted_certificates:
192 1
                if uacrypto.der_from_x509(cert) == client_cert:
193
                    # even if certificates match, veryfy that a matching key is assigned with the signature
194 1
                    # not sure what is the role of data here but the verification can't be done with any data
195 1
                    try:
196
                        uacrypto.verify_sha1(cert, data, sig2)
197 1
                        verification_passed = True
198 1
                    except:
199
                        self.logger.warning('Faulty client signature received by server during connection activation')
200 1
                    break
201 1
202 1
            if verification_passed: 
203
                self.logger.info('Verification of signature with a trusted certificate is confirmed')
204 1
            else:
205
                self.logger.warning('An untrusted client activates a session with the server!!!')
206 1
207 1
            #
208
            #######
209 1
            #
210 1
211
            result = self.session.activate_session(params)
212 1
213 1
            response = ua.ActivateSessionResponse()
214 1
            response.Parameters = result
215
216 1
            self.logger.info("sending read response")
217
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
218 1
219 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
220
            self.logger.info("Read request")
221 1
            params = struct_from_binary(ua.ReadParameters, body)
222 1
223
            results = self.session.read(params)
224 1
225 1
            response = ua.ReadResponse()
226 1
            response.Results = results
227
228 1
            self.logger.info("sending read response")
229
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
230 1
231 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
232
            self.logger.info("Write request")
233 1
            params = struct_from_binary(ua.WriteParameters, body)
234 1
235
            results = self.session.write(params)
236 1
237 1
            response = ua.WriteResponse()
238 1
            response.Results = results
239
240 1
            self.logger.info("sending write response")
241
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
242 1
243
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
244 1
            self.logger.info("Browse request")
245 1
            params = struct_from_binary(ua.BrowseParameters, body)
246
247 1
            results = self.session.browse(params)
248
249
            response = ua.BrowseResponse()
250
            response.Results = results
251
252
            self.logger.info("sending browse response")
253
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
254
255
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
256
            self.logger.info("get endpoints request")
257
            params = struct_from_binary(ua.GetEndpointsParameters, body)
258
259 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
260 1
261 1
            response = ua.GetEndpointsResponse()
262
            response.Endpoints = endpoints
263 1
264
            self.logger.info("sending get endpoints response")
265 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
266 1
267
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
268 1
            self.logger.info("find servers request")
269 1
            params = struct_from_binary(ua.FindServersParameters, body)
270
271 1
            servers = self.iserver.find_servers(params)
272 1
273 1
            response = ua.FindServersResponse()
274
            response.Servers = servers
275 1
276
            self.logger.info("sending find servers response")
277 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
278 1
279
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
280 1
            self.logger.info("register server request")
281 1
            serv = struct_from_binary(ua.RegisteredServer, body)
282
283 1
            self.iserver.register_server(serv)
284 1
285 1
            response = ua.RegisterServerResponse()
286
287 1
            self.logger.info("sending register server response")
288
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
289 1
290 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
291
            self.logger.info("register server 2 request")
292 1
            params = struct_from_binary(ua.RegisterServer2Parameters, body)
293 1
294
            results = self.iserver.register_server2(params)
295 1
296 1
            response = ua.RegisterServer2Response()
297 1
            response.ConfigurationResults = results
298
299 1
            self.logger.info("sending register server 2 response")
300
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
301 1
302 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
303
            self.logger.info("translate browsepaths to nodeids request")
304 1
            params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
305 1
306
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
307 1
308 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
309 1
            response.Results = paths
310
311 1
            self.logger.info("sending translate browsepaths to nodeids response")
312
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
313 1
314 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
315
            self.logger.info("add nodes request")
316 1
            params = struct_from_binary(ua.AddNodesParameters, body)
317 1
318
            results = self.session.add_nodes(params.NodesToAdd)
319
320 1
            response = ua.AddNodesResponse()
321 1
            response.Results = results
322 1
323
            self.logger.info("sending add node response")
324 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
325
326 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
327 1
            self.logger.info("delete nodes request")
328
            params = struct_from_binary(ua.DeleteNodesParameters, body)
329 1
330 1
            results = self.session.delete_nodes(params)
331
332 1
            response = ua.DeleteNodesResponse()
333
            response.Results = results
334
335
            self.logger.info("sending delete node response")
336
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
337
338
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
339
            self.logger.info("add references request")
340
            params = struct_from_binary(ua.AddReferencesParameters, body)
341
342
            results = self.session.add_references(params.ReferencesToAdd)
343
344 1
            response = ua.AddReferencesResponse()
345 1
            response.Results = results
346 1
347
            self.logger.info("sending add references response")
348 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
349
350 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
351 1
            self.logger.info("delete references request")
352
            params = struct_from_binary(ua.DeleteReferencesParameters, body)
353 1
354 1
            results = self.session.delete_references(params.ReferencesToDelete)
355
356 1
            response = ua.DeleteReferencesResponse()
357 1
            response.Parameters.Results = results
358 1
359 1
            self.logger.info("sending delete references response")
360
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
361 1
362 1
363
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
364 1
            self.logger.info("create subscription request")
365 1
            params = struct_from_binary(ua.CreateSubscriptionParameters, body)
366
367 1
            result = self.session.create_subscription(params, self.forward_publish_response)
368
369
            response = ua.CreateSubscriptionResponse()
370
            response.Parameters = result
371
372
            self.logger.info("sending create subscription response")
373
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
374
375
        elif typeid == ua.NodeId(ua.ObjectIds.ModifySubscriptionRequest_Encoding_DefaultBinary):
376
            self.logger.info("modify subscription request")
377
            params = struct_from_binary(ua.ModifySubscriptionParameters, body)
378 1
379 1
            result = self.session.modify_subscription(params, self.forward_publish_response)
380 1
381
            response = ua.ModifySubscriptionResponse()
382 1
            response.Parameters = result
383
384 1
            self.logger.info("sending modify subscription response")
385 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
386
387 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
388 1
            self.logger.info("delete subscriptions request")
389
            params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
390 1
391
            results = self.session.delete_subscriptions(params.SubscriptionIds)
392
393
            response = ua.DeleteSubscriptionsResponse()
394
            response.Results = results
395
396
            self.logger.info("sending delte subscription response")
397
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
398
399
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
400
            self.logger.info("create monitored items request")
401
            params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
402 1
            results = self.session.create_monitored_items(params)
403
404
            response = ua.CreateMonitoredItemsResponse()
405
            response.Results = results
406
407
            self.logger.info("sending create monitored items response")
408
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
409
410
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
411
            self.logger.info("modify monitored items request")
412
            params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
413 1
            results = self.session.modify_monitored_items(params)
414
415
            response = ua.ModifyMonitoredItemsResponse()
416
            response.Results = results
417
418
            self.logger.info("sending modify monitored items response")
419
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
420
421
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
422 1
            self.logger.info("delete monitored items request")
423 1
            params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
424
425 1
            results = self.session.delete_monitored_items(params)
426
427
            response = ua.DeleteMonitoredItemsResponse()
428 1
            response.Results = results
429
430 1
            self.logger.info("sending delete monitored items response")
431 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
432 1
433 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
434 1
            self.logger.info("history read request")
435 1
            params = struct_from_binary(ua.HistoryReadParameters, body)
436 1
437
            results = self.session.history_read(params)
438
439 1
            response = ua.HistoryReadResponse()
440 1
            response.Results = results
441
442 1
            self.logger.info("sending history read response")
443
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
444
445
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
446
            self.logger.info("register nodes request")
447
            params = struct_from_binary(ua.RegisterNodesParameters, body)
448
            self.logger.info("Node registration not implemented")
449
450
            response = ua.RegisterNodesResponse()
451
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
452
453 1
            self.logger.info("sending register nodes response")
454
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
455
456
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
457
            self.logger.info("unregister nodes request")
458
            params = struct_from_binary(ua.UnregisterNodesParameters, body)
459
460 1
            response = ua.UnregisterNodesResponse()
461 1
462
            self.logger.info("sending unregister nodes response")
463 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
464
465 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
466
            self.logger.info("publish request")
467 1
468 1
            if not self.session:
469
                return False
470 1
471
            params = struct_from_binary(ua.PublishParameters, body)
472
473 1
            data = PublishRequestData()
474 1
            data.requesthdr = requesthdr
475
            data.seqhdr = seqhdr
476 1
            data.algohdr = algohdr
477
            with self._datalock:
478 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
479
                if self._publish_result_queue:
480
                    result = self._publish_result_queue.pop(0)
481
                    self.forward_publish_response(result)
482
            self.session.publish(params.SubscriptionAcknowledgements)
483 1
            self.logger.info("publish forward to server")
484 1
485 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
486
            self.logger.info("re-publish request")
487
488
            params = struct_from_binary(ua.RepublishParameters, body)
489
            msg = self.session.republish(params)
490
491
            response = ua.RepublishResponse()
492
            response.NotificationMessage = msg
493
494
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
495
496
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
497
            self.logger.info("close secure channel request")
498
            self._connection.close()
499
            response = ua.CloseSecureChannelResponse()
500
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
501
            return False
502
503
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
504
            self.logger.info("call request")
505
506
            params = struct_from_binary(ua.CallParameters, body)
507
508
            results = self.session.call(params.MethodsToCall)
509
510
            response = ua.CallResponse()
511
            response.Results = results
512
513
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
514
515
        else:
516
            self.logger.warning("Unknown message received %s", typeid)
517
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
518
519
        return True
520
521
    def close(self):
522
        """
523
        to be called when client has disconnected to ensure we really close
524
        everything we should
525
        """
526
        self.logger.info("Cleanup client connection: %s", self.name)
527
        if self.session:
528
            self.session.close_session(True)
529