Completed
Pull Request — master (#490)
by Olivier
03:32
created

PublishRequestData   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 7
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 7
ccs 0
cts 6
cp 0
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 5 1
1
2
import logging
3
from threading import RLock, Lock
4
import time
5
6
from opcua import ua
7
from opcua.ua.ua_binary import nodeid_from_binary, struct_from_binary
8
from opcua.ua.ua_binary import struct_to_binary
9
from opcua.common import utils
10
11
12
class PublishRequestData(object):
13
14
    def __init__(self):
15
        self.requesthdr = None
16
        self.algohdr = None
17
        self.seqhdr = None
18
        self.timestamp = time.time()
19
20
21
class UaProcessor(object):
22
23
    def __init__(self, internal_server, socket):
24
        self.logger = logging.getLogger(__name__)
25
        self.iserver = internal_server
26
        self.name = socket.get_extra_info('peername')
27
        self.sockname = socket.get_extra_info('sockname')
28
        self.session = None
29
        self.socket = socket
30
        self._socketlock = Lock()
31
        self._datalock = RLock()
32
        self._publishdata_queue = []
33
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
34
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
35
36
    def set_policies(self, policies):
37
        self._connection.set_policy_factories(policies)
38
39
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
40
        with self._socketlock:
41
            response.ResponseHeader.RequestHandle = requesthandle
42
            data = self._connection.message_to_binary(
43
                struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
44
45
            self.socket.write(data)
46
47
    def open_secure_channel(self, algohdr, seqhdr, body):
48
        request = struct_from_binary(ua.OpenSecureChannelRequest, body)
49
50
        self._connection.select_policy(
51
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
52
53
        channel = self._connection.open(request.Parameters, self.iserver)
54
        # send response
55
        response = ua.OpenSecureChannelResponse()
56
        response.Parameters = channel
57
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
58
59
    def forward_publish_response(self, result):
60
        self.logger.info("forward publish response %s", result)
61
        with self._datalock:
62
            while True:
63
                if len(self._publishdata_queue) == 0:
64
                    self._publish_result_queue.append(result)
65
                    self.logger.info("Server wants to send publish answer but no publish request is available,"
66
                                     "enqueing notification, length of result queue is %s",
67
                                     len(self._publish_result_queue))
68
                    return
69
                requestdata = self._publishdata_queue.pop(0)
70
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
71
                    break
72
73
        response = ua.PublishResponse()
74
        response.Parameters = result
75
76
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
77
78
    def process(self, header, body):
79
        msg = self._connection.receive_from_header_and_body(header, body)
80
        if isinstance(msg, ua.Message):
81
            if header.MessageType == ua.MessageType.SecureOpen:
82
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
83
84
            elif header.MessageType == ua.MessageType.SecureClose:
85
                self._connection.close()
86
                return False
87
88
            elif header.MessageType == ua.MessageType.SecureMessage:
89
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
90
        elif isinstance(msg, ua.Hello):
91
            ack = ua.Acknowledge()
92
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
93
            ack.SendBufferSize = msg.SendBufferSize
94
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
95
            self.socket.write(data)
96
        elif isinstance(msg, ua.ErrorMessage):
97
            self.logger.warning("Received an error message type")
98
        elif msg is None:
99
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
100
        else:
101
            self.logger.warning("Unsupported message type: %s", header.MessageType)
102
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
103
        return True
104
105
    def process_message(self, algohdr, seqhdr, body):
106
        typeid = nodeid_from_binary(body)
107
        requesthdr = struct_from_binary(ua.RequestHeader, body)
108
        try:
109
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
110
        except utils.ServiceError as e:
111
            status = ua.StatusCode(e.code)
112
            response = ua.ServiceFault()
113
            response.ResponseHeader.ServiceResult = status
114
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
115
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
116
            return True
117
118
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
119
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
120
            self.logger.info("Create session request")
121
            params = struct_from_binary(ua.CreateSessionParameters, body)
122
123
            # create the session on server
124
            self.session = self.iserver.create_session(self.name, external=True)
125
            # get a session creation result to send back
126
            sessiondata = self.session.create_session(params, sockname=self.sockname)
127
128
            response = ua.CreateSessionResponse()
129
            response.Parameters = sessiondata
130
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
131
            if self._connection._security_policy.server_certificate is None:
132
                data = params.ClientNonce
133
            else:
134
                data = self._connection._security_policy.server_certificate + params.ClientNonce
135
            response.Parameters.ServerSignature.Signature = \
136
                self._connection._security_policy.asymmetric_cryptography.signature(data)
137
138
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
139
140
            self.logger.info("sending create sesssion response")
141
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
142
143
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
144
            self.logger.info("Close session request")
145
            deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
146
147
            self.session.close_session(deletesubs)
148
149
            response = ua.CloseSessionResponse()
150
            self.logger.info("sending close sesssion response")
151
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
152
153
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
154
            self.logger.info("Activate session request")
155
            params = struct_from_binary(ua.ActivateSessionParameters, body)
156
157
            if not self.session:
158
                self.logger.info("request to activate non-existing session")
159
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
160
161
            if self._connection._security_policy.client_certificate is None:
162
                data = self.session.nonce
163
            else:
164
                data = self._connection._security_policy.client_certificate + self.session.nonce
165
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
166
167
            result = self.session.activate_session(params)
168
169
            response = ua.ActivateSessionResponse()
170
            response.Parameters = result
171
172
            self.logger.info("sending read response")
173
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
174
175
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
176
            self.logger.info("Read request")
177
            params = struct_from_binary(ua.ReadParameters, body)
178
179
            results = self.session.read(params)
180
181
            response = ua.ReadResponse()
182
            response.Results = results
183
184
            self.logger.info("sending read response")
185
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
186
187
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
188
            self.logger.info("Write request")
189
            params = struct_from_binary(ua.WriteParameters, body)
190
191
            results = self.session.write(params)
192
193
            response = ua.WriteResponse()
194
            response.Results = results
195
196
            self.logger.info("sending write response")
197
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
198
199
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
200
            self.logger.info("Browse request")
201
            params = struct_from_binary(ua.BrowseParameters, body)
202
203
            results = self.session.browse(params)
204
205
            response = ua.BrowseResponse()
206
            response.Results = results
207
208
            self.logger.info("sending browse response")
209
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
210
211
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
212
            self.logger.info("get endpoints request")
213
            params = struct_from_binary(ua.GetEndpointsParameters, body)
214
215
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
216
217
            response = ua.GetEndpointsResponse()
218
            response.Endpoints = endpoints
219
220
            self.logger.info("sending get endpoints response")
221
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
222
223
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
224
            self.logger.info("find servers request")
225
            params = struct_from_binary(ua.FindServersParameters, body)
226
227
            servers = self.iserver.find_servers(params)
228
229
            response = ua.FindServersResponse()
230
            response.Servers = servers
231
232
            self.logger.info("sending find servers response")
233
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
234
235
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
236
            self.logger.info("register server request")
237
            serv = struct_from_binary(ua.RegisteredServer, body)
238
239
            self.iserver.register_server(serv)
240
241
            response = ua.RegisterServerResponse()
242
243
            self.logger.info("sending register server response")
244
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
245
246
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
247
            self.logger.info("register server 2 request")
248
            params = form_binary(ua.RegisterServer2Parameters, body)
249
250
            results = self.iserver.register_server2(params)
251
252
            response = ua.RegisterServer2Response()
253
            response.ConfigurationResults = results
254
255
            self.logger.info("sending register server 2 response")
256
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
257
258
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
259
            self.logger.info("translate browsepaths to nodeids request")
260
            params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
261
262
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
263
264
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
265
            response.Results = paths
266
267
            self.logger.info("sending translate browsepaths to nodeids response")
268
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
269
270
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
271
            self.logger.info("add nodes request")
272
            params = struct_from_binary(ua.AddNodesParameters, body)
273
274
            results = self.session.add_nodes(params.NodesToAdd)
275
276
            response = ua.AddNodesResponse()
277
            response.Results = results
278
279
            self.logger.info("sending add node response")
280
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
281
282
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
283
            self.logger.info("delete nodes request")
284
            params = struct_from_binary(ua.DeleteNodesParameters, body)
285
286
            results = self.session.delete_nodes(params)
287
288
            response = ua.DeleteNodesResponse()
289
            response.Results = results
290
291
            self.logger.info("sending delete node response")
292
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
293
294
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
295
            self.logger.info("add references request")
296
            params = struct_from_binary(ua.AddReferencesParameters, body)
297
298
            results = self.session.add_references(params.ReferencesToAdd)
299
300
            response = ua.AddReferencesResponse()
301
            response.Results = results
302
303
            self.logger.info("sending add references response")
304
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
305
306
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
307
            self.logger.info("delete references request")
308
            params = struct_from_binary(ua.DeleteReferencesParameters, body)
309
310
            results = self.session.delete_references(params.ReferencesToDelete)
311
312
            response = ua.DeleteReferencesResponse()
313
            response.Parameters.Results = results
314
315
            self.logger.info("sending delete references response")
316
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
317
318
319
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
320
            self.logger.info("create subscription request")
321
            params = struct_from_binary(ua.CreateSubscriptionParameters, body)
322
323
            result = self.session.create_subscription(params, self.forward_publish_response)
324
325
            response = ua.CreateSubscriptionResponse()
326
            response.Parameters = result
327
328
            self.logger.info("sending create subscription response")
329
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
330
331
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
332
            self.logger.info("delete subscriptions request")
333
            params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
334
335
            results = self.session.delete_subscriptions(params.SubscriptionIds)
336
337
            response = ua.DeleteSubscriptionsResponse()
338
            response.Results = results
339
340
            self.logger.info("sending delte subscription response")
341
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
342
343
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
344
            self.logger.info("create monitored items request")
345
            params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
346
            results = self.session.create_monitored_items(params)
347
348
            response = ua.CreateMonitoredItemsResponse()
349
            response.Results = results
350
351
            self.logger.info("sending create monitored items response")
352
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
353
354
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
355
            self.logger.info("modify monitored items request")
356
            params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
357
            results = self.session.modify_monitored_items(params)
358
359
            response = ua.ModifyMonitoredItemsResponse()
360
            response.Results = results
361
362
            self.logger.info("sending modify monitored items response")
363
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
364
365
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
366
            self.logger.info("delete monitored items request")
367
            params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
368
369
            results = self.session.delete_monitored_items(params)
370
371
            response = ua.DeleteMonitoredItemsResponse()
372
            response.Results = results
373
374
            self.logger.info("sending delete monitored items response")
375
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
376
377
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
378
            self.logger.info("history read request")
379
            params = struct_from_binary(ua.HistoryReadParameters, body)
380
381
            results = self.session.history_read(params)
382
383
            response = ua.HistoryReadResponse()
384
            response.Results = results
385
386
            self.logger.info("sending history read response")
387
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
388
389
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
390
            self.logger.info("register nodes request")
391
            params = struct_from_binary(ua.RegisterNodesParameters, body)
392
            self.logger.info("Node registration not implemented")
393
394
            response = ua.RegisterNodesResponse()
395
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
396
397
            self.logger.info("sending register nodes response")
398
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
399
400
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
401
            self.logger.info("unregister nodes request")
402
            params = struct_from_binary(ua.UnregisterNodesParameters, body)
403
404
            response = ua.UnregisterNodesResponse()
405
406
            self.logger.info("sending unregister nodes response")
407
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
408
409
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
410
            self.logger.info("publish request")
411
412
            if not self.session:
413
                return False
414
415
            params = struct_from_binary(ua.PublishParameters, body)
416
417
            data = PublishRequestData()
418
            data.requesthdr = requesthdr
419
            data.seqhdr = seqhdr
420
            data.algohdr = algohdr
421
            with self._datalock:
422
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
423
                if self._publish_result_queue:
424
                    result = self._publish_result_queue.pop(0)
425
                    self.forward_publish_response(result)
426
            self.session.publish(params.SubscriptionAcknowledgements)
427
            self.logger.info("publish forward to server")
428
429
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
430
            self.logger.info("re-publish request")
431
432
            params = struct_from_binary(ua.RepublishParameters, body)
433
            msg = self.session.republish(params)
434
435
            response = ua.RepublishResponse()
436
            response.NotificationMessage = msg
437
438
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
439
440
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
441
            self.logger.info("close secure channel request")
442
            self._connection.close()
443
            response = ua.CloseSecureChannelResponse()
444
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
445
            return False
446
447
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
448
            self.logger.info("call request")
449
450
            params = struct_from_binary(ua.CallParameters, body)
451
452
            results = self.session.call(params.MethodsToCall)
453
454
            response = ua.CallResponse()
455
            response.Results = results
456
457
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
458
459
        else:
460
            self.logger.warning("Unknown message received %s", typeid)
461
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
462
463
        return True
464
465
    def close(self):
466
        """
467
        to be called when client has disconnected to ensure we really close
468
        everything we should
469
        """
470
        print("Cleanup client connection: ", self.name)
471
        if self.session:
472
            self.session.close_session(True)
473