Test Failed
Pull Request — master (#490)
by Olivier
03:08
created

PublishRequestData   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 7
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 7
ccs 0
cts 6
cp 0
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 5 1
1
2
import logging
3
from threading import RLock, Lock
4
import time
5
6
from opcua import ua
7
from opcua.ua.ua_binary import from_binary, to_binary
8
from opcua.common import utils
9
10
11
class PublishRequestData(object):
12
13
    def __init__(self):
14
        self.requesthdr = None
15
        self.algohdr = None
16
        self.seqhdr = None
17
        self.timestamp = time.time()
18
19
20
class UaProcessor(object):
21
22
    def __init__(self, internal_server, socket):
23
        self.logger = logging.getLogger(__name__)
24
        self.iserver = internal_server
25
        self.name = socket.get_extra_info('peername')
26
        self.sockname = socket.get_extra_info('sockname')
27
        self.session = None
28
        self.socket = socket
29
        self._socketlock = Lock()
30
        self._datalock = RLock()
31
        self._publishdata_queue = []
32
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
33
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
34
35
    def set_policies(self, policies):
36
        self._connection.set_policy_factories(policies)
37
38
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
39
        with self._socketlock:
40
            response.ResponseHeader.RequestHandle = requesthandle
41
            data = self._connection.message_to_binary(
42
                to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
43
44
            self.socket.write(data)
45
46
    def open_secure_channel(self, algohdr, seqhdr, body):
47
        request = from_binary(ua.OpenSecureChannelRequest, body)
48
49
        self._connection.select_policy(
50
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
51
52
        channel = self._connection.open(request.Parameters, self.iserver)
53
        # send response
54
        response = ua.OpenSecureChannelResponse()
55
        response.Parameters = channel
56
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
57
58
    def forward_publish_response(self, result):
59
        self.logger.info("forward publish response %s", result)
60
        with self._datalock:
61
            while True:
62
                if len(self._publishdata_queue) == 0:
63
                    self._publish_result_queue.append(result)
64
                    self.logger.info("Server wants to send publish answer but no publish request is available,"
65
                                     "enqueing notification, length of result queue is %s",
66
                                     len(self._publish_result_queue))
67
                    return
68
                requestdata = self._publishdata_queue.pop(0)
69
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
70
                    break
71
72
        response = ua.PublishResponse()
73
        response.Parameters = result
74
75
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
76
77
    def process(self, header, body):
78
        msg = self._connection.receive_from_header_and_body(header, body)
79
        if isinstance(msg, ua.Message):
80
            if header.MessageType == ua.MessageType.SecureOpen:
81
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
82
83
            elif header.MessageType == ua.MessageType.SecureClose:
84
                self._connection.close()
85
                return False
86
87
            elif header.MessageType == ua.MessageType.SecureMessage:
88
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
89
        elif isinstance(msg, ua.Hello):
90
            ack = ua.Acknowledge()
91
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
92
            ack.SendBufferSize = msg.SendBufferSize
93
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
94
            self.socket.write(data)
95
        elif isinstance(msg, ua.ErrorMessage):
96
            self.logger.warning("Received an error message type")
97
        elif msg is None:
98
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
99
        else:
100
            self.logger.warning("Unsupported message type: %s", header.MessageType)
101
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
102
        return True
103
104
    def process_message(self, algohdr, seqhdr, body):
105
        typeid = from_binary(ua.NodeId, body)
106
        requesthdr = from_binary(ua.RequestHeader, body)
107
        try:
108
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
109
        except utils.ServiceError as e:
110
            status = ua.StatusCode(e.code)
111
            response = ua.ServiceFault()
112
            response.ResponseHeader.ServiceResult = status
113
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
114
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
115
            return True
116
117
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
118
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
119
            self.logger.info("Create session request")
120
            params = from_binary(ua.CreateSessionParameters, body)
121
122
            # create the session on server
123
            self.session = self.iserver.create_session(self.name, external=True)
124
            # get a session creation result to send back
125
            sessiondata = self.session.create_session(params, sockname=self.sockname)
126
127
            response = ua.CreateSessionResponse()
128
            response.Parameters = sessiondata
129
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
130
            if self._connection._security_policy.server_certificate is None:
131
                data = params.ClientNonce
132
            else:
133
                data = self._connection._security_policy.server_certificate + params.ClientNonce
134
            response.Parameters.ServerSignature.Signature = \
135
                self._connection._security_policy.asymmetric_cryptography.signature(data)
136
137
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
138
139
            self.logger.info("sending create sesssion response")
140
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
141
142
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
143
            self.logger.info("Close session request")
144
            deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
145
146
            self.session.close_session(deletesubs)
147
148
            response = ua.CloseSessionResponse()
149
            self.logger.info("sending close sesssion response")
150
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
151
152
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
153
            self.logger.info("Activate session request")
154
            params = from_binary(ua.ActivateSessionParameters, body)
155
156
            if not self.session:
157
                self.logger.info("request to activate non-existing session")
158
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
159
160
            if self._connection._security_policy.client_certificate is None:
161
                data = self.session.nonce
162
            else:
163
                data = self._connection._security_policy.client_certificate + self.session.nonce
164
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
165
166
            result = self.session.activate_session(params)
167
168
            response = ua.ActivateSessionResponse()
169
            response.Parameters = result
170
171
            self.logger.info("sending read response")
172
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
173
174
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
175
            self.logger.info("Read request")
176
            params = from_binary(ua.ReadParameters, body)
177
178
            results = self.session.read(params)
179
180
            response = ua.ReadResponse()
181
            response.Results = results
182
183
            self.logger.info("sending read response")
184
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
185
186
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
187
            self.logger.info("Write request")
188
            params = from_binary(ua.WriteParameters, body)
189
190
            results = self.session.write(params)
191
192
            response = ua.WriteResponse()
193
            response.Results = results
194
195
            self.logger.info("sending write response")
196
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
197
198
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
199
            self.logger.info("Browse request")
200
            params = from_binary(ua.BrowseParameters, body)
201
202
            results = self.session.browse(params)
203
204
            response = ua.BrowseResponse()
205
            response.Results = results
206
207
            self.logger.info("sending browse response")
208
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
209
210
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
211
            self.logger.info("get endpoints request")
212
            params = from_binary(ua.GetEndpointsParameters, body)
213
214
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
215
216
            response = ua.GetEndpointsResponse()
217
            response.Endpoints = endpoints
218
219
            self.logger.info("sending get endpoints response")
220
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
221
222
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
223
            self.logger.info("find servers request")
224
            params = from_binary(ua.FindServersParameters, body)
225
226
            servers = self.iserver.find_servers(params)
227
228
            response = ua.FindServersResponse()
229
            response.Servers = servers
230
231
            self.logger.info("sending find servers response")
232
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
233
234
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
235
            self.logger.info("register server request")
236
            serv = from_binary(ua.RegisteredServer, body)
237
238
            self.iserver.register_server(serv)
239
240
            response = ua.RegisterServerResponse()
241
242
            self.logger.info("sending register server response")
243
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
244
245
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
246
            self.logger.info("register server 2 request")
247
            params = form_binary(ua.RegisterServer2Parameters, body)
248
249
            results = self.iserver.register_server2(params)
250
251
            response = ua.RegisterServer2Response()
252
            response.ConfigurationResults = results
253
254
            self.logger.info("sending register server 2 response")
255
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
256
257
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
258
            self.logger.info("translate browsepaths to nodeids request")
259
            params = from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
260
261
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
262
263
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
264
            response.Results = paths
265
266
            self.logger.info("sending translate browsepaths to nodeids response")
267
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
268
269
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
270
            self.logger.info("add nodes request")
271
            params = from_binary(ua.AddNodesParameters, body)
272
273
            results = self.session.add_nodes(params.NodesToAdd)
274
275
            response = ua.AddNodesResponse()
276
            response.Results = results
277
278
            self.logger.info("sending add node response")
279
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
280
281
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
282
            self.logger.info("delete nodes request")
283
            params = from_binary(ua.DeleteNodesParameters, body)
284
285
            results = self.session.delete_nodes(params)
286
287
            response = ua.DeleteNodesResponse()
288
            response.Results = results
289
290
            self.logger.info("sending delete node response")
291
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
292
293
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
294
            self.logger.info("add references request")
295
            params = from_binary(ua.AddReferencesParameters, body)
296
297
            results = self.session.add_references(params.ReferencesToAdd)
298
299
            response = ua.AddReferencesResponse()
300
            response.Results = results
301
302
            self.logger.info("sending add references response")
303
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
304
305
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
306
            self.logger.info("delete references request")
307
            params = from_binary(ua.DeleteReferencesParameters, body)
308
309
            results = self.session.delete_references(params.ReferencesToDelete)
310
311
            response = ua.DeleteReferencesResponse()
312
            response.Parameters.Results = results
313
314
            self.logger.info("sending delete references response")
315
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
316
317
318
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
319
            self.logger.info("create subscription request")
320
            params = from_binary(ua.CreateSubscriptionParameters, body)
321
322
            result = self.session.create_subscription(params, self.forward_publish_response)
323
324
            response = ua.CreateSubscriptionResponse()
325
            response.Parameters = result
326
327
            self.logger.info("sending create subscription response")
328
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
329
330
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
331
            self.logger.info("delete subscriptions request")
332
            params = from_binary(ua.DeleteSubscriptionsParameters, body)
333
334
            results = self.session.delete_subscriptions(params.SubscriptionIds)
335
336
            response = ua.DeleteSubscriptionsResponse()
337
            response.Results = results
338
339
            self.logger.info("sending delte subscription response")
340
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
341
342
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
343
            self.logger.info("create monitored items request")
344
            params = from_binary(ua.CreateMonitoredItemsParameters, body)
345
            results = self.session.create_monitored_items(params)
346
347
            response = ua.CreateMonitoredItemsResponse()
348
            response.Results = results
349
350
            self.logger.info("sending create monitored items response")
351
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
352
353
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
354
            self.logger.info("modify monitored items request")
355
            params = from_binary(ua.ModifyMonitoredItemsParameters, body)
356
            results = self.session.modify_monitored_items(params)
357
358
            response = ua.ModifyMonitoredItemsResponse()
359
            response.Results = results
360
361
            self.logger.info("sending modify monitored items response")
362
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
363
364
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
365
            self.logger.info("delete monitored items request")
366
            params = from_binary(ua.DeleteMonitoredItemsParameters, body)
367
368
            results = self.session.delete_monitored_items(params)
369
370
            response = ua.DeleteMonitoredItemsResponse()
371
            response.Results = results
372
373
            self.logger.info("sending delete monitored items response")
374
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
375
376
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
377
            self.logger.info("history read request")
378
            params = from_binary(ua.HistoryReadParameters, body)
379
380
            results = self.session.history_read(params)
381
382
            response = ua.HistoryReadResponse()
383
            response.Results = results
384
385
            self.logger.info("sending history read response")
386
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
387
388
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
389
            self.logger.info("register nodes request")
390
            params = from_binary(ua.RegisterNodesParameters, body)
391
            self.logger.info("Node registration not implemented")
392
393
            response = ua.RegisterNodesResponse()
394
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
395
396
            self.logger.info("sending register nodes response")
397
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
398
399
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
400
            self.logger.info("unregister nodes request")
401
            params = from_binary(ua.UnregisterNodesParameters, body)
402
403
            response = ua.UnregisterNodesResponse()
404
405
            self.logger.info("sending unregister nodes response")
406
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
407
408
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
409
            self.logger.info("publish request")
410
411
            if not self.session:
412
                return False
413
414
            params = from_binary(ua.PublishParameters, body)
415
416
            data = PublishRequestData()
417
            data.requesthdr = requesthdr
418
            data.seqhdr = seqhdr
419
            data.algohdr = algohdr
420
            with self._datalock:
421
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
422
                if self._publish_result_queue:
423
                    result = self._publish_result_queue.pop(0)
424
                    self.forward_publish_response(result)
425
            self.session.publish(params.SubscriptionAcknowledgements)
426
            self.logger.info("publish forward to server")
427
428
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
429
            self.logger.info("re-publish request")
430
431
            params = from_binary(ua.RepublishParameters, body)
432
            msg = self.session.republish(params)
433
434
            response = ua.RepublishResponse()
435
            response.NotificationMessage = msg
436
437
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
438
439
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
440
            self.logger.info("close secure channel request")
441
            self._connection.close()
442
            response = ua.CloseSecureChannelResponse()
443
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
444
            return False
445
446
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
447
            self.logger.info("call request")
448
449
            params = from_binary(ua.CallParameters, body)
450
451
            results = self.session.call(params.MethodsToCall)
452
453
            response = ua.CallResponse()
454
            response.Results = results
455
456
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
457
458
        else:
459
            self.logger.warning("Unknown message received %s", typeid)
460
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
461
462
        return True
463
464
    def close(self):
465
        """
466
        to be called when client has disconnected to ensure we really close
467
        everything we should
468
        """
469
        print("Cleanup client connection: ", self.name)
470
        if self.session:
471
            self.session.close_session(True)
472