Completed
Push — master ( 4eddf2...83c23e )
by Olivier
04:06
created

UaProcessor.open_secure_channel()   A

Complexity

Conditions 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 11
ccs 7
cts 7
cp 1
crap 1
rs 9.4285
1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
import time
5
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16 1
        self.timestamp = time.time()
17
18
19 1
class UaProcessor(object):
20
21 1
    def __init__(self, internal_server, socket):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self.iserver = internal_server
24 1
        self.name = socket.get_extra_info('peername')
25 1
        self.sockname = socket.get_extra_info('sockname')
26 1
        self.session = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = RLock()
30 1
        self._publishdata_queue = []
31 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
32 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
33
34 1
    def set_policies(self, policies):
35 1
        self._connection.set_policy_factories(policies)
36
37 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
38 1
        with self._socketlock:
39 1
            response.ResponseHeader.RequestHandle = requesthandle
40 1
            data = self._connection.message_to_binary(
41
                response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
42
43 1
            self.socket.write(data)
44
45 1
    def open_secure_channel(self, algohdr, seqhdr, body):
46 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
47
48 1
        self._connection.select_policy(
49
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
50
51 1
        channel = self._connection.open(request.Parameters, self.iserver)
52
        # send response
53 1
        response = ua.OpenSecureChannelResponse()
54 1
        response.Parameters = channel
55 1
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
56
57 1
    def forward_publish_response(self, result):
58 1
        self.logger.info("forward publish response %s", result)
59 1
        with self._datalock:
60 1
            while True:
61 1
                if len(self._publishdata_queue) == 0:
62
                    self._publish_result_queue.append(result)
63
                    self.logger.info("Server wants to send publish answer but no publish request is available,"
64
                                     "enqueing notification, length of result queue is %s",
65
                                     len(self._publish_result_queue))
66
                    return
67 1
                requestdata = self._publishdata_queue.pop(0)
68 1
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
69 1
                    break
70
71 1
        response = ua.PublishResponse()
72 1
        response.Parameters = result
73
74 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
75
76 1
    def process(self, header, body):
77 1
        msg = self._connection.receive_from_header_and_body(header, body)
78 1
        if isinstance(msg, ua.Message):
79 1
            if header.MessageType == ua.MessageType.SecureOpen:
80 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
81
82 1
            elif header.MessageType == ua.MessageType.SecureClose:
83 1
                self._connection.close()
84 1
                return False
85
86 1
            elif header.MessageType == ua.MessageType.SecureMessage:
87 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
88
89 1
        elif isinstance(msg, ua.Hello):
90 1
            ack = ua.Acknowledge()
91 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
92 1
            ack.SendBufferSize = msg.SendBufferSize
93 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
94 1
            self.socket.write(data)
95
96
        elif isinstance(msg, ua.ErrorMessage):
97
            self.logger.warning("Received an error message type")
98
99
        else:
100
            self.logger.warning("Unsupported message type: %s", header.MessageType)
101
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
102 1
        return True
103
104 1
    def process_message(self, algohdr, seqhdr, body):
105 1
        typeid = ua.NodeId.from_binary(body)
106 1
        requesthdr = ua.RequestHeader.from_binary(body)
107 1
        try:
108 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
109 1
        except utils.ServiceError as e:
110 1
            status = ua.StatusCode(e.code)
111 1
            response = ua.ServiceFault()
112 1
            response.ResponseHeader.ServiceResult = status
113 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
114 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
115 1
            return True
116
117 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
118 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
119 1
            self.logger.info("Create session request")
120 1
            params = ua.CreateSessionParameters.from_binary(body)
121
122
            # create the session on server
123 1
            self.session = self.iserver.create_session(self.name, external=True)
124
            # get a session creation result to send back
125 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)
126
127 1
            response = ua.CreateSessionResponse()
128 1
            response.Parameters = sessiondata
129 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
130 1
            if self._connection._security_policy.server_certificate is None:
131 1
                data = params.ClientNonce
132
            else:
133 1
                data = self._connection._security_policy.server_certificate + params.ClientNonce
134 1
            response.Parameters.ServerSignature.Signature =\
135
                self._connection._security_policy.asymmetric_cryptography.signature(data)
136
137 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
138
139 1
            self.logger.info("sending create sesssion response")
140 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
141
142 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
143 1
            self.logger.info("Close session request")
144 1
            deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
145
146 1
            self.session.close_session(deletesubs)
147
148 1
            response = ua.CloseSessionResponse()
149 1
            self.logger.info("sending close sesssion response")
150 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
151
152 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
153 1
            self.logger.info("Activate session request")
154 1
            params = ua.ActivateSessionParameters.from_binary(body)
155
156 1
            if not self.session:
157
                self.logger.info("request to activate non-existing session")
158
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
159
160 1
            if self._connection._security_policy.client_certificate is None:
161 1
                data = self.session.nonce
162
            else:
163 1
                data = self._connection._security_policy.client_certificate + self.session.nonce
164 1
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
165
166 1
            result = self.session.activate_session(params)
167
168 1
            response = ua.ActivateSessionResponse()
169 1
            response.Parameters = result
170
171 1
            self.logger.info("sending read response")
172 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
173
174 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
175 1
            self.logger.info("Read request")
176 1
            params = ua.ReadParameters.from_binary(body)
177
178 1
            results = self.session.read(params)
179
180 1
            response = ua.ReadResponse()
181 1
            response.Results = results
182
183 1
            self.logger.info("sending read response")
184 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
185
186 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
187 1
            self.logger.info("Write request")
188 1
            params = ua.WriteParameters.from_binary(body)
189
190 1
            results = self.session.write(params)
191
192 1
            response = ua.WriteResponse()
193 1
            response.Results = results
194
195 1
            self.logger.info("sending write response")
196 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
197
198 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
199 1
            self.logger.info("Browse request")
200 1
            params = ua.BrowseParameters.from_binary(body)
201
202 1
            results = self.session.browse(params)
203
204 1
            response = ua.BrowseResponse()
205 1
            response.Results = results
206
207 1
            self.logger.info("sending browse response")
208 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
209
210 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
211 1
            self.logger.info("get endpoints request")
212 1
            params = ua.GetEndpointsParameters.from_binary(body)
213
214 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
215
216 1
            response = ua.GetEndpointsResponse()
217 1
            response.Endpoints = endpoints
218
219 1
            self.logger.info("sending get endpoints response")
220 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
221
222 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
223 1
            self.logger.info("find servers request")
224 1
            params = ua.FindServersParameters.from_binary(body)
225
226 1
            servers = self.iserver.find_servers(params)
227
228 1
            response = ua.FindServersResponse()
229 1
            response.Servers = servers
230
231 1
            self.logger.info("sending find servers response")
232 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
233
234 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
235 1
            self.logger.info("register server request")
236 1
            serv = ua.RegisteredServer.from_binary(body)
237
238 1
            self.iserver.register_server(serv)
239
240 1
            response = ua.RegisterServerResponse()
241
242 1
            self.logger.info("sending register server response")
243 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
244
245 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
246
            self.logger.info("register server 2 request")
247
            params = ua.RegisterServer2Parameters.from_binary(body)
248
249
            results = self.iserver.register_server2(params)
250
251
            response = ua.RegisterServer2Response()
252
            response.ConfigurationResults = results
253
254
            self.logger.info("sending register server 2 response")
255
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
256
257 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
258 1
            self.logger.info("translate browsepaths to nodeids request")
259 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
260
261 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
262
263 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
264 1
            response.Results = paths
265
266 1
            self.logger.info("sending translate browsepaths to nodeids response")
267 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
268
269 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
270 1
            self.logger.info("add nodes request")
271 1
            params = ua.AddNodesParameters.from_binary(body)
272
273 1
            results = self.session.add_nodes(params.NodesToAdd)
274
275 1
            response = ua.AddNodesResponse()
276 1
            response.Results = results
277
278 1
            self.logger.info("sending add node response")
279 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
280
281 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
282 1
            self.logger.info("delete nodes request")
283 1
            params = ua.DeleteNodesParameters.from_binary(body)
284
285 1
            results = self.session.delete_nodes(params)
286
287 1
            response = ua.DeleteNodesResponse()
288 1
            response.Results = results
289
290 1
            self.logger.info("sending delete node response")
291 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
292
293 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
294 1
            self.logger.info("add references request")
295 1
            params = ua.AddReferencesParameters.from_binary(body)
296
297 1
            results = self.session.add_references(params.ReferencesToAdd)
298
299 1
            response = ua.AddReferencesResponse()
300 1
            response.Results = results
301
302 1
            self.logger.info("sending add references response")
303 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
304
305 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
306 1
            self.logger.info("create subscription request")
307 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
308
309 1
            result = self.session.create_subscription(params, self.forward_publish_response)
310
311 1
            response = ua.CreateSubscriptionResponse()
312 1
            response.Parameters = result
313
314 1
            self.logger.info("sending create subscription response")
315 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
316
317 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
318 1
            self.logger.info("delete subscriptions request")
319 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
320
321 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
322
323 1
            response = ua.DeleteSubscriptionsResponse()
324 1
            response.Results = results
325
326 1
            self.logger.info("sending delte subscription response")
327 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
328
329 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
330 1
            self.logger.info("create monitored items request")
331 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
332 1
            results = self.session.create_monitored_items(params)
333
334 1
            response = ua.CreateMonitoredItemsResponse()
335 1
            response.Results = results
336
337 1
            self.logger.info("sending create monitored items response")
338 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
339
340 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
341
            self.logger.info("modify monitored items request")
342
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
343
            results = self.session.modify_monitored_items(params)
344
345
            response = ua.ModifyMonitoredItemsResponse()
346
            response.Results = results
347
348
            self.logger.info("sending modify monitored items response")
349
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
350
351 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
352 1
            self.logger.info("delete monitored items request")
353 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
354
355 1
            results = self.session.delete_monitored_items(params)
356
357 1
            response = ua.DeleteMonitoredItemsResponse()
358 1
            response.Results = results
359
360 1
            self.logger.info("sending delete monitored items response")
361 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
362
363 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
364
            self.logger.info("history read request")
365
            params = ua.HistoryReadParameters.from_binary(body)
366
367
            results = self.session.history_read(params)
368
369
            response = ua.HistoryReadResponse()
370
            response.Results = results
371
372
            self.logger.info("sending history read response")
373
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
374
375 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
376
            self.logger.info("register nodes request")
377
            params = ua.RegisterNodesParameters.from_binary(body)
378
            self.logger.info("Node registration not implemented")
379
380
            response = ua.RegisterNodesResponse()
381
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
382
383
            self.logger.info("sending register nodes response")
384
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
385
386 1
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
387
            self.logger.info("unregister nodes request")
388
            params = ua.UnregisterNodesParameters.from_binary(body)
389
390
            response = ua.UnregisterNodesResponse()
391
392
            self.logger.info("sending unregister nodes response")
393
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
394
395 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
396 1
            self.logger.info("publish request")
397
398 1
            if not self.session:
399
                return False
400
401 1
            params = ua.PublishParameters.from_binary(body)
402
403 1
            data = PublishRequestData()
404 1
            data.requesthdr = requesthdr
405 1
            data.seqhdr = seqhdr
406 1
            data.algohdr = algohdr
407 1
            with self._datalock:
408 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
409 1
                if self._publish_result_queue:
410
                    result = self._publish_result_queue.pop(0)
411
                    self.forward_publish_response(result)
412 1
            self.session.publish(params.SubscriptionAcknowledgements)
413 1
            self.logger.info("publish forward to server")
414
415 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
416
            self.logger.info("re-publish request")
417
418
            params = ua.RepublishParameters.from_binary(body)
419
            msg = self.session.republish(params)
420
421
            response = ua.RepublishResponse()
422
            response.NotificationMessage = msg
423
424
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
425
426 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
427
            self.logger.info("close secure channel request")
428
            self._connection.close()
429
            response = ua.CloseSecureChannelResponse()
430
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
431
            return False
432
433 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
434 1
            self.logger.info("call request")
435
436 1
            params = ua.CallParameters.from_binary(body)
437
438 1
            results = self.session.call(params.MethodsToCall)
439
440 1
            response = ua.CallResponse()
441 1
            response.Results = results
442
443 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
444
445
        else:
446 1
            self.logger.warning("Unknown message received %s", typeid)
447 1
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
448
449 1
        return True
450
451 1
    def close(self):
452
        """
453
        to be called when client has disconnected to ensure we really close
454
        everything we should
455
        """
456 1
        print("Cleanup client connection: ", self.name)
457 1
        if self.session:
458
            self.session.close_session(True)
459