Passed
Push — master ( 5d9e87...cb3973 )
by Olivier
02:22
created

asyncua.server.uaprocessor.UaProcessor.process()   B

Complexity

Conditions 8

Size

Total Lines 24
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 23
nop 3
dl 0
loc 24
rs 7.3333
c 0
b 0
f 0
1
import time
2
import logging
3
from typing import Deque, Optional
4
from collections import deque
5
6
from asyncua import ua
7
from ..ua.ua_binary import nodeid_from_binary, struct_from_binary, struct_to_binary, uatcp_to_binary
8
from .internal_server import InternalServer, InternalSession
9
from ..common.connection import SecureConnection
10
from ..common.utils import ServiceError
11
12
_logger = logging.getLogger(__name__)
13
14
15
class PublishRequestData:
16
17
    def __init__(self, requesthdr=None, seqhdr=None):
18
        self.requesthdr = requesthdr
19
        self.seqhdr = seqhdr
20
        self.timestamp = time.time()
21
22
23
class UaProcessor:
24
    """
25
    Processor for OPC UA messages. Implements the OPC UA protocol for the server side.
26
    """
27
28
    def __init__(self, internal_server: InternalServer, transport):
29
        self.iserver: InternalServer = internal_server
30
        self.name = transport.get_extra_info('peername')
31
        self.sockname = transport.get_extra_info('sockname')
32
        self.session: Optional[InternalSession] = None
33
        self._transport = transport
34
        # deque for Publish Requests
35
        self._publish_requests: Deque[PublishRequestData] = deque()
36
        # used when we need to wait for PublishRequest
37
        self._publish_results: Deque[ua.PublishResult] = deque()
38
        self._connection = SecureConnection(ua.SecurityPolicy())
39
40
    def set_policies(self, policies):
41
        self._connection.set_policy_factories(policies)
42
43
    def send_response(self, requesthandle, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
44
        response.ResponseHeader.RequestHandle = requesthandle
45
        data = self._connection.message_to_binary(
46
            struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId)
47
        self._transport.write(data)
48
49
    def open_secure_channel(self, algohdr, seqhdr, body):
50
        request = struct_from_binary(ua.OpenSecureChannelRequest, body)
51
52
        self._connection.select_policy(
53
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
54
55
        channel = self._connection.open(request.Parameters, self.iserver)
56
        # send response
57
        response = ua.OpenSecureChannelResponse()
58
        response.Parameters = channel
59
        self.send_response(request.RequestHeader.RequestHandle, seqhdr, response, ua.MessageType.SecureOpen)
60
61
    def forward_publish_response(self, result: ua.PublishResult):
62
        """
63
        Try to send a `PublishResponse` with the given `PublishResult`.
64
        """
65
        #_logger.info("forward publish response %s", result)
66
        while True:
67
            if not self._publish_requests:
68
                self._publish_results.append(result)
69
                _logger.info(
70
                    "Server wants to send publish answer but no publish request is available,"
71
                    "enqueuing notification, length of result queue is %s",
72
                    len(self._publish_results)
73
                )
74
                return
75
            # We pop left from the Publish Request deque (FIFO)
76
            requestdata = self._publish_requests.popleft()
77
            if (requestdata.requesthdr.TimeoutHint == 0 or
78
                    requestdata.requesthdr.TimeoutHint != 0 and
79
                    time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000):
80
                # Continue and use `requestdata` only if there was no timeout
81
                break
82
        response = ua.PublishResponse()
83
        response.Parameters = result
84
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.seqhdr, response)
85
86
    async def process(self, header, body):
87
        msg = self._connection.receive_from_header_and_body(header, body)
88
        if isinstance(msg, ua.Message):
89
            if header.MessageType == ua.MessageType.SecureOpen:
90
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
91
            elif header.MessageType == ua.MessageType.SecureClose:
92
                self._connection.close()
93
                return False
94
            elif header.MessageType == ua.MessageType.SecureMessage:
95
                return await self.process_message(msg.SequenceHeader(), msg.body())
96
        elif isinstance(msg, ua.Hello):
97
            ack = ua.Acknowledge()
98
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
99
            ack.SendBufferSize = msg.SendBufferSize
100
            data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
101
            self._transport.write(data)
102
        elif isinstance(msg, ua.ErrorMessage):
103
            _logger.warning("Received an error message type")
104
        elif msg is None:
105
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
106
        else:
107
            _logger.warning("Unsupported message type: %s", header.MessageType)
108
            raise ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
109
        return True
110
111
    async def process_message(self, seqhdr, body):
112
        """
113
        Process incoming messages.
114
        """
115
        typeid = nodeid_from_binary(body)
116
        requesthdr = struct_from_binary(ua.RequestHeader, body)
117
        _logger.debug('process_message %r %r', typeid, requesthdr)
118
        try:
119
            return await self._process_message(typeid, requesthdr, seqhdr, body)
120
        except ServiceError as e:
121
            status = ua.StatusCode(e.code)
122
            response = ua.ServiceFault()
123
            response.ResponseHeader.ServiceResult = status
124
            _logger.error("sending service fault response: %s (%s)", status.doc, status.name)
125
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
126
            return True
127
        except Exception:
128
            _logger.exception('Error while processing message')
129
            response = ua.ServiceFault()
130
            response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadInternalError)
131
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
132
            return True
133
134
    async def _process_message(self, typeid, requesthdr, seqhdr, body):
135
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
136
            _logger.info("Create session request")
137
            params = struct_from_binary(ua.CreateSessionParameters, body)
138
            # create the session on server
139
            self.session = self.iserver.create_session(self.name, external=True)
140
            # get a session creation result to send back
141
            sessiondata = await self.session.create_session(params, sockname=self.sockname)
142
            response = ua.CreateSessionResponse()
143
            response.Parameters = sessiondata
144
            response.Parameters.ServerCertificate = self._connection.security_policy.client_certificate
145
            if self._connection.security_policy.server_certificate is None:
146
                data = params.ClientNonce
147
            else:
148
                data = self._connection.security_policy.server_certificate + params.ClientNonce
149
            response.Parameters.ServerSignature.Signature = \
150
                self._connection.security_policy.asymmetric_cryptography.signature(data)
151
            response.Parameters.ServerSignature.Algorithm = self._connection.security_policy.AsymmetricSignatureURI
152
            #_logger.info("sending create session response")
153
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
154
155
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
156
            _logger.info("Close session request")
157
            if self.session:
158
                deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
159
                await self.session.close_session(deletesubs)
160
            else:
161
                _logger.info("Request to close non-existing session")
162
163
            response = ua.CloseSessionResponse()
164
            _logger.info("sending close session response")
165
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
166
167
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
168
            _logger.info("Activate session request")
169
            params = struct_from_binary(ua.ActivateSessionParameters, body)
170
            if not self.session:
171
                _logger.info("request to activate non-existing session")
172
                raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
173
            if self._connection.security_policy.client_certificate is None:
174
                data = self.session.nonce
175
            else:
176
                data = self._connection.security_policy.client_certificate + self.session.nonce
177
            self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
178
            result = self.session.activate_session(params)
179
            response = ua.ActivateSessionResponse()
180
            response.Parameters = result
181
            #_logger.info("sending read response")
182
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
183
184
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
185
            _logger.info("Read request")
186
            params = struct_from_binary(ua.ReadParameters, body)
187
            results = await self.session.read(params)
188
            response = ua.ReadResponse()
189
            response.Results = results
190
            #_logger.info("sending read response")
191
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
192
193
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
194
            _logger.info("Write request")
195
            params = struct_from_binary(ua.WriteParameters, body)
196
            results = await self.session.write(params)
197
            response = ua.WriteResponse()
198
            response.Results = results
199
            #_logger.info("sending write response")
200
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
201
202
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
203
            _logger.info("Browse request")
204
            params = struct_from_binary(ua.BrowseParameters, body)
205
            results = await self.session.browse(params)
206
            response = ua.BrowseResponse()
207
            response.Results = results
208
            #_logger.info("sending browse response")
209
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
210
211
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
212
            _logger.info("get endpoints request")
213
            params = struct_from_binary(ua.GetEndpointsParameters, body)
214
            endpoints = await self.iserver.get_endpoints(params, sockname=self.sockname)
215
            response = ua.GetEndpointsResponse()
216
            response.Endpoints = endpoints
217
            #_logger.info("sending get endpoints response")
218
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
219
220
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
221
            _logger.info("find servers request")
222
            params = struct_from_binary(ua.FindServersParameters, body)
223
            servers = self.iserver.find_servers(params)
224
            response = ua.FindServersResponse()
225
            response.Servers = servers
226
            #_logger.info("sending find servers response")
227
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
228
229
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
230
            _logger.info("register server request")
231
            serv = struct_from_binary(ua.RegisteredServer, body)
232
            self.iserver.register_server(serv)
233
            response = ua.RegisterServerResponse()
234
            #_logger.info("sending register server response")
235
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
236
237
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
238
            _logger.info("register server 2 request")
239
            params = struct_from_binary(ua.RegisterServer2Parameters, body)
240
            results = self.iserver.register_server2(params)
241
            response = ua.RegisterServer2Response()
242
            response.ConfigurationResults = results
243
            #_logger.info("sending register server 2 response")
244
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
245
246
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
247
            _logger.info("translate browsepaths to nodeids request")
248
            params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
249
            paths = await self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
250
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
251
            response.Results = paths
252
            #_logger.info("sending translate browsepaths to nodeids response")
253
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
254
255
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
256
            _logger.info("add nodes request")
257
            params = struct_from_binary(ua.AddNodesParameters, body)
258
            results = await self.session.add_nodes(params.NodesToAdd)
259
            response = ua.AddNodesResponse()
260
            response.Results = results
261
            #_logger.info("sending add node response")
262
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
263
264
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
265
            _logger.info("delete nodes request")
266
            params = struct_from_binary(ua.DeleteNodesParameters, body)
267
            results = await self.session.delete_nodes(params)
268
            response = ua.DeleteNodesResponse()
269
            response.Results = results
270
            #_logger.info("sending delete node response")
271
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
272
273
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
274
            _logger.info("add references request")
275
            params = struct_from_binary(ua.AddReferencesParameters, body)
276
            results = await self.session.add_references(params.ReferencesToAdd)
277
            response = ua.AddReferencesResponse()
278
            response.Results = results
279
            #_logger.info("sending add references response")
280
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
281
282
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
283
            _logger.info("delete references request")
284
            params = struct_from_binary(ua.DeleteReferencesParameters, body)
285
            results = await self.session.delete_references(params.ReferencesToDelete)
286
            response = ua.DeleteReferencesResponse()
287
            response.Parameters.Results = results
288
            #_logger.info("sending delete references response")
289
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
290
291
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
292
            _logger.info("create subscription request")
293
            params = struct_from_binary(ua.CreateSubscriptionParameters, body)
294
            result = await self.session.create_subscription(params, callback=self.forward_publish_response)
295
            response = ua.CreateSubscriptionResponse()
296
            response.Parameters = result
297
            #_logger.info("sending create subscription response")
298
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
299
300
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
301
            _logger.info("delete subscriptions request")
302
            params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
303
            results = await self.session.delete_subscriptions(params.SubscriptionIds)
304
            response = ua.DeleteSubscriptionsResponse()
305
            response.Results = results
306
            #_logger.info("sending delete subscription response")
307
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
308
309
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
310
            _logger.info("create monitored items request")
311
            params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
312
            results = await self.session.create_monitored_items(params)
313
            response = ua.CreateMonitoredItemsResponse()
314
            response.Results = results
315
            #_logger.info("sending create monitored items response")
316
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
317
318
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
319
            _logger.info("modify monitored items request")
320
            params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
321
            results = await self.session.modify_monitored_items(params)
322
            response = ua.ModifyMonitoredItemsResponse()
323
            response.Results = results
324
            #_logger.info("sending modify monitored items response")
325
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
326
327
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
328
            _logger.info("delete monitored items request")
329
            params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
330
            results = await self.session.delete_monitored_items(params)
331
            response = ua.DeleteMonitoredItemsResponse()
332
            response.Results = results
333
            #_logger.info("sending delete monitored items response")
334
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
335
336
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
337
            _logger.info("history read request")
338
            params = struct_from_binary(ua.HistoryReadParameters, body)
339
            results = await self.session.history_read(params)
340
            response = ua.HistoryReadResponse()
341
            response.Results = results
342
            #_logger.info("sending history read response")
343
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
344
345
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
346
            _logger.info("register nodes request")
347
            params = struct_from_binary(ua.RegisterNodesParameters, body)
348
            _logger.info("Node registration not implemented")
349
            response = ua.RegisterNodesResponse()
350
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
351
            #_logger.info("sending register nodes response")
352
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
353
354
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
355
            _logger.info("unregister nodes request")
356
            params = struct_from_binary(ua.UnregisterNodesParameters, body)
357
            response = ua.UnregisterNodesResponse()
358
            #_logger.info("sending unregister nodes response")
359
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
360
361
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
362
            _logger.debug("publish request")
363
            if not self.session:
364
                return False
365
            params = struct_from_binary(ua.PublishParameters, body)
366
            data = PublishRequestData(requesthdr=requesthdr, seqhdr=seqhdr)
367
            # Store the Publish Request (will be used to send publish answers from server)
368
            self._publish_requests.append(data)
369
            # If there is an enqueued result forward it immediately
370
            while self._publish_results:
371
                result = self._publish_results.popleft()
372
                if result.SubscriptionId not in self.session.subscription_service.active_subscription_ids:
373
                    # Discard the result if the subscription is no longer active
374
                    continue
375
                self.forward_publish_response(result)
376
                break
377
            self.session.publish(params.SubscriptionAcknowledgements)
378
            #_logger.debug("publish forward to server")
379
380
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
381
            _logger.info("re-publish request")
382
            params = struct_from_binary(ua.RepublishParameters, body)
383
            msg = self.session.republish(params)
384
            response = ua.RepublishResponse()
385
            response.NotificationMessage = msg
386
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
387
388
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
389
            _logger.info("close secure channel request")
390
            self._connection.close()
391
            response = ua.CloseSecureChannelResponse()
392
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
393
            return False
394
395
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
396
            _logger.info("call request")
397
            params = struct_from_binary(ua.CallParameters, body)
398
            results = await self.session.call(params.MethodsToCall)
399
            response = ua.CallResponse()
400
            response.Results = results
401
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
402
403
        elif typeid == ua.NodeId(ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary):
404
            _logger.info("set monitoring mode request")
405
            params = struct_from_binary(ua.SetMonitoringModeParameters, body)
406
            # FIXME: Implement SetMonitoringMode
407
            # For now send dummy results to keep clients happy
408
            response = ua.SetMonitoringModeResponse()
409
            results = ua.SetMonitoringModeResult()
410
            ids = params.MonitoredItemIds
411
            statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
412
            results.Results = statuses
413
            response.Parameters = results
414
            _logger.info("sending set monitoring mode response")
415
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
416
417
        elif typeid == ua.NodeId(ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary):
418
            _logger.info("set publishing mode request")
419
            params = struct_from_binary(ua.SetPublishingModeParameters, body)
420
            # FIXME: Implement SetPublishingMode
421
            # For now send dummy results to keep clients happy
422
            response = ua.SetPublishingModeResponse()
423
            results = ua.SetPublishingModeResult()
424
            ids = params.SubscriptionIds
425
            statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
426
            results.Results = statuses
427
            response.Parameters = results
428
            _logger.info("sending set publishing mode response")
429
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
430
431
        else:
432
            _logger.warning("Unknown message received %s", typeid)
433
            raise ServiceError(ua.StatusCodes.BadServiceUnsupported)
434
435
        return True
436
437
    async def close(self):
438
        """
439
        to be called when client has disconnected to ensure we really close
440
        everything we should
441
        """
442
        _logger.info("Cleanup client connection: %s", self.name)
443
        if self.session:
444
            await self.session.close_session(True)
445