Passed
Push — master ( b0b54c...47920a )
by Olivier
02:38
created

asyncua.server.uaprocessor.UaProcessor.process()   C

Complexity

Conditions 9

Size

Total Lines 28
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 27
nop 3
dl 0
loc 28
rs 6.6666
c 0
b 0
f 0
1
import time
2
import logging
3
from typing import Deque, Optional
4
from collections import deque
5
6
from asyncua import ua
7
from ..ua.ua_binary import nodeid_from_binary, struct_from_binary, struct_to_binary, uatcp_to_binary
8
from .internal_server import InternalServer, InternalSession
9
from ..common.connection import SecureConnection
10
from ..common.utils import ServiceError
11
12
_logger = logging.getLogger(__name__)
13
14
15
class PublishRequestData:
16
17
    def __init__(self, requesthdr=None, seqhdr=None):
18
        self.requesthdr = requesthdr
19
        self.seqhdr = seqhdr
20
        self.timestamp = time.time()
21
22
23
class UaProcessor:
24
    """
25
    Processor for OPC UA messages. Implements the OPC UA protocol for the server side.
26
    """
27
28
    def __init__(self, internal_server: InternalServer, transport):
29
        self.iserver: InternalServer = internal_server
30
        self.name = transport.get_extra_info('peername')
31
        self.sockname = transport.get_extra_info('sockname')
32
        self.session: Optional[InternalSession] = None
33
        self._transport = transport
34
        # deque for Publish Requests
35
        self._publish_requests: Deque[PublishRequestData] = deque()
36
        # used when we need to wait for PublishRequest
37
        self._publish_results: Deque[ua.PublishResult] = deque()
38
        self._connection = SecureConnection(ua.SecurityPolicy())
39
40
    def set_policies(self, policies):
41
        self._connection.set_policy_factories(policies)
42
43
    def send_response(self, requesthandle, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
44
        response.ResponseHeader.RequestHandle = requesthandle
45
        data = self._connection.message_to_binary(
46
            struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId)
47
        self._transport.write(data)
48
49
    def open_secure_channel(self, algohdr, seqhdr, body):
50
        request = struct_from_binary(ua.OpenSecureChannelRequest, body)
51
52
        self._connection.select_policy(
53
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
54
55
        channel = self._connection.open(request.Parameters, self.iserver)
56
        # send response
57
        response = ua.OpenSecureChannelResponse()
58
        response.Parameters = channel
59
        self.send_response(request.RequestHeader.RequestHandle, seqhdr, response, ua.MessageType.SecureOpen)
60
61
    async def forward_publish_response(self, result: ua.PublishResult):
62
        """
63
        Try to send a `PublishResponse` with the given `PublishResult`.
64
        """
65
        #_logger.info("forward publish response %s", result)
66
        while True:
67
            if not self._publish_requests:
68
                self._publish_results.append(result)
69
                _logger.info(
70
                    "Server wants to send publish answer but no publish request is available,"
71
                    "enqueuing notification, length of result queue is %s",
72
                    len(self._publish_results)
73
                )
74
                return
75
            # We pop left from the Publish Request deque (FIFO)
76
            requestdata = self._publish_requests.popleft()
77
            if (requestdata.requesthdr.TimeoutHint == 0 or
78
                    requestdata.requesthdr.TimeoutHint != 0 and
79
                    time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000):
80
                # Continue and use `requestdata` only if there was no timeout
81
                break
82
        response = ua.PublishResponse()
83
        response.Parameters = result
84
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.seqhdr, response)
85
86
    async def process(self, header, body):
87
        try:
88
            msg = self._connection.receive_from_header_and_body(header, body)
89
        except ua.uaerrors.BadUserAccessDenied as e:
90
            _logger.warning("Unauthenticated user attempted to connect")
91
            return False
92
        if isinstance(msg, ua.Message):
93
            if header.MessageType == ua.MessageType.SecureOpen:
94
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
95
            elif header.MessageType == ua.MessageType.SecureClose:
96
                self._connection.close()
97
                return False
98
            elif header.MessageType == ua.MessageType.SecureMessage:
99
                return await self.process_message(msg.SequenceHeader(), msg.body())
100
        elif isinstance(msg, ua.Hello):
101
            ack = ua.Acknowledge()
102
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
103
            ack.SendBufferSize = msg.SendBufferSize
104
            data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
105
            self._transport.write(data)
106
        elif isinstance(msg, ua.ErrorMessage):
107
            _logger.warning("Received an error message type")
108
        elif msg is None:
109
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
110
        else:
111
            _logger.warning("Unsupported message type: %s", header.MessageType)
112
            raise ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
113
        return True
114
115
    async def process_message(self, seqhdr, body):
116
        """
117
        Process incoming messages.
118
        """
119
        typeid = nodeid_from_binary(body)
120
        requesthdr = struct_from_binary(ua.RequestHeader, body)
121
        _logger.debug('process_message %r %r', typeid, requesthdr)
122
        try:
123
            return await self._process_message(typeid, requesthdr, seqhdr, body)
124
        except ServiceError as e:
125
            status = ua.StatusCode(e.code)
126
            response = ua.ServiceFault()
127
            response.ResponseHeader.ServiceResult = status
128
            _logger.error("sending service fault response: %s (%s)", status.doc, status.name)
129
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
130
            return True
131
        except ua.uaerrors.BadUserAccessDenied as e:
132
            if self.session:
133
                user = self.session.user
134
            else:
135
                user = 'Someone'
136
            _logger.warning("%s attempted to do something they are not permitted to do", user)
137
            response = ua.ServiceFault()
138
            response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
139
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
140
        except Exception:
141
            _logger.exception('Error while processing message')
142
            response = ua.ServiceFault()
143
            response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadInternalError)
144
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
145
            return True
146
147
    async def _process_message(self, typeid, requesthdr, seqhdr, body):
148
        if typeid in [ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary),
149
                      ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary)]:
150
            # The connection is first created without a user being attached, and then during activation the
151
            user = None
152
        elif self.session is None:
153
            raise ua.uaerrors.BadUserAccessDenied
154
        else:
155
            user = self.session.user
156
            if self._connection.security_policy.permissions is not None:
157
                if self._connection.security_policy.permissions.check_validity(user, typeid, body) is False:
158
                    raise ua.uaerrors.BadUserAccessDenied
159
160
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
161
            _logger.info("Create session request (%s)", user)
162
            params = struct_from_binary(ua.CreateSessionParameters, body)
163
            # create the session on server
164
            self.session = self.iserver.create_session(self.name, external=True)
165
            # get a session creation result to send back
166
            sessiondata = await self.session.create_session(params, sockname=self.sockname)
167
            response = ua.CreateSessionResponse()
168
            response.Parameters = sessiondata
169
            response.Parameters.ServerCertificate = self._connection.security_policy.host_certificate
170
            if self._connection.security_policy.peer_certificate is None:
171
                data = params.ClientNonce
172
            else:
173
                data = self._connection.security_policy.peer_certificate + params.ClientNonce
174
            response.Parameters.ServerSignature.Signature = \
175
                self._connection.security_policy.asymmetric_cryptography.signature(data)
176
            response.Parameters.ServerSignature.Algorithm = self._connection.security_policy.AsymmetricSignatureURI
177
            #_logger.info("sending create session response")
178
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
179
180
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
181
            _logger.info("Close session request (%s)", user)
182
            if self.session:
183
                deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
184
                await self.session.close_session(deletesubs)
185
            else:
186
                _logger.info("Request to close non-existing session (%s)", user)
187
188
            response = ua.CloseSessionResponse()
189
            _logger.info("sending close session response (%s)", user)
190
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
191
192
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
193
            _logger.info("Activate session request (%s)", user)
194
            params = struct_from_binary(ua.ActivateSessionParameters, body)
195
            if not self.session:
196
                _logger.info("request to activate non-existing session (%s)", user)
197
                raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
198
            if self._connection.security_policy.host_certificate is None:
199
                data = self.session.nonce
200
            else:
201
                data = self._connection.security_policy.host_certificate + self.session.nonce
202
            self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
203
            result = self.session.activate_session(params, self._connection.security_policy.peer_certificate)
204
            response = ua.ActivateSessionResponse()
205
            response.Parameters = result
206
            #_logger.info("sending read response")
207
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
208
209
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
210
            _logger.info("Read request (%s)", user)
211
            params = struct_from_binary(ua.ReadParameters, body)
212
            results = await self.session.read(params)
213
            response = ua.ReadResponse()
214
            response.Results = results
215
            #_logger.info("sending read response")
216
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
217
218
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
219
            _logger.info("Write request (%s)", user)
220
            params = struct_from_binary(ua.WriteParameters, body)
221
            results = await self.session.write(params)
222
            response = ua.WriteResponse()
223
            response.Results = results
224
            #_logger.info("sending write response")
225
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
226
227
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
228
            _logger.info("Browse request (%s)", user)
229
            params = struct_from_binary(ua.BrowseParameters, body)
230
            results = await self.session.browse(params)
231
            response = ua.BrowseResponse()
232
            response.Results = results
233
            #_logger.info("sending browse response")
234
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
235
236
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
237
            _logger.info("get endpoints request (%s)", user)
238
            params = struct_from_binary(ua.GetEndpointsParameters, body)
239
            endpoints = await self.iserver.get_endpoints(params, sockname=self.sockname)
240
            response = ua.GetEndpointsResponse()
241
            response.Endpoints = endpoints
242
            #_logger.info("sending get endpoints response")
243
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
244
245
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
246
            _logger.info("find servers request (%s)", user)
247
            params = struct_from_binary(ua.FindServersParameters, body)
248
            servers = self.iserver.find_servers(params)
249
            response = ua.FindServersResponse()
250
            response.Servers = servers
251
            #_logger.info("sending find servers response")
252
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
253
254
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
255
            _logger.info("register server request %s", user)
256
            serv = struct_from_binary(ua.RegisteredServer, body)
257
            self.iserver.register_server(serv)
258
            response = ua.RegisterServerResponse()
259
            #_logger.info("sending register server response")
260
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
261
262
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
263
            _logger.info("register server 2 request %s", user)
264
            params = struct_from_binary(ua.RegisterServer2Parameters, body)
265
            results = self.iserver.register_server2(params)
266
            response = ua.RegisterServer2Response()
267
            response.ConfigurationResults = results
268
            #_logger.info("sending register server 2 response")
269
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
270
271
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
272
            _logger.info("translate browsepaths to nodeids request (%s)", user)
273
            params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
274
            paths = await self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
275
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
276
            response.Results = paths
277
            #_logger.info("sending translate browsepaths to nodeids response")
278
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
279
280
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
281
            _logger.info("add nodes request (%s)", user)
282
            params = struct_from_binary(ua.AddNodesParameters, body)
283
            results = await self.session.add_nodes(params.NodesToAdd)
284
            response = ua.AddNodesResponse()
285
            response.Results = results
286
            #_logger.info("sending add node response")
287
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
288
289
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
290
            _logger.info("delete nodes request (%s)", user)
291
            params = struct_from_binary(ua.DeleteNodesParameters, body)
292
            results = await self.session.delete_nodes(params)
293
            response = ua.DeleteNodesResponse()
294
            response.Results = results
295
            #_logger.info("sending delete node response")
296
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
297
298
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
299
            _logger.info("add references request (%s)", user)
300
            params = struct_from_binary(ua.AddReferencesParameters, body)
301
            results = await self.session.add_references(params.ReferencesToAdd)
302
            response = ua.AddReferencesResponse()
303
            response.Results = results
304
            #_logger.info("sending add references response")
305
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
306
307
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
308
            _logger.info("delete references request (%s)", user)
309
            params = struct_from_binary(ua.DeleteReferencesParameters, body)
310
            results = await self.session.delete_references(params.ReferencesToDelete)
311
            response = ua.DeleteReferencesResponse()
312
            response.Parameters.Results = results
313
            #_logger.info("sending delete references response")
314
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
315
316
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
317
            _logger.info("create subscription request (%s)", user)
318
            params = struct_from_binary(ua.CreateSubscriptionParameters, body)
319
            result = await self.session.create_subscription(params, callback=self.forward_publish_response)
320
            response = ua.CreateSubscriptionResponse()
321
            response.Parameters = result
322
            #_logger.info("sending create subscription response")
323
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
324
325
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
326
            _logger.info("delete subscriptions request (%s)", user)
327
            params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
328
            results = await self.session.delete_subscriptions(params.SubscriptionIds)
329
            response = ua.DeleteSubscriptionsResponse()
330
            response.Results = results
331
            #_logger.info("sending delete subscription response")
332
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
333
334
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
335
            _logger.info("create monitored items request (%s)", user)
336
            params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
337
            results = await self.session.create_monitored_items(params)
338
            response = ua.CreateMonitoredItemsResponse()
339
            response.Results = results
340
            #_logger.info("sending create monitored items response")
341
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
342
343
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
344
            _logger.info("modify monitored items request (%s)", user)
345
            params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
346
            results = await self.session.modify_monitored_items(params)
347
            response = ua.ModifyMonitoredItemsResponse()
348
            response.Results = results
349
            #_logger.info("sending modify monitored items response")
350
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
351
352
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
353
            _logger.info("delete monitored items request (%s)", user)
354
            params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
355
            results = await self.session.delete_monitored_items(params)
356
            response = ua.DeleteMonitoredItemsResponse()
357
            response.Results = results
358
            #_logger.info("sending delete monitored items response")
359
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
360
361
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
362
            _logger.info("history read request (%s)", user)
363
            params = struct_from_binary(ua.HistoryReadParameters, body)
364
            results = await self.session.history_read(params)
365
            response = ua.HistoryReadResponse()
366
            response.Results = results
367
            #_logger.info("sending history read response")
368
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
369
370
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
371
            _logger.info("register nodes request (%s)", user)
372
            params = struct_from_binary(ua.RegisterNodesParameters, body)
373
            _logger.info("Node registration not implemented")
374
            response = ua.RegisterNodesResponse()
375
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
376
            #_logger.info("sending register nodes response")
377
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
378
379
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
380
            _logger.info("unregister nodes request (%s)", user)
381
            params = struct_from_binary(ua.UnregisterNodesParameters, body)
382
            response = ua.UnregisterNodesResponse()
383
            #_logger.info("sending unregister nodes response")
384
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
385
386
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
387
            _logger.debug("publish request (%s)", user)
388
            if not self.session:
389
                return False
390
            params = struct_from_binary(ua.PublishParameters, body)
391
            data = PublishRequestData(requesthdr=requesthdr, seqhdr=seqhdr)
392
            # Store the Publish Request (will be used to send publish answers from server)
393
            self._publish_requests.append(data)
394
            # If there is an enqueued result forward it immediately
395
            while self._publish_results:
396
                result = self._publish_results.popleft()
397
                if result.SubscriptionId not in self.session.subscription_service.active_subscription_ids:
398
                    # Discard the result if the subscription is no longer active
399
                    continue
400
                await self.forward_publish_response(result)
401
                break
402
            self.session.publish(params.SubscriptionAcknowledgements)
403
            #_logger.debug("publish forward to server")
404
405
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
406
            _logger.info("re-publish request (%s)", user)
407
            params = struct_from_binary(ua.RepublishParameters, body)
408
            msg = self.session.republish(params)
409
            response = ua.RepublishResponse()
410
            response.NotificationMessage = msg
411
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
412
413
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
414
            _logger.info("close secure channel request (%s)", user)
415
            self._connection.close()
416
            response = ua.CloseSecureChannelResponse()
417
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
418
            return False
419
420
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
421
            _logger.info("call request (%s)", user)
422
            params = struct_from_binary(ua.CallParameters, body)
423
            results = await self.session.call(params.MethodsToCall)
424
            response = ua.CallResponse()
425
            response.Results = results
426
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
427
428
        elif typeid == ua.NodeId(ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary):
429
            _logger.info("set monitoring mode request (%s)", user)
430
            params = struct_from_binary(ua.SetMonitoringModeParameters, body)
431
            # FIXME: Implement SetMonitoringMode
432
            # For now send dummy results to keep clients happy
433
            response = ua.SetMonitoringModeResponse()
434
            results = ua.SetMonitoringModeResult()
435
            ids = params.MonitoredItemIds
436
            statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
437
            results.Results = statuses
438
            response.Parameters = results
439
            _logger.info("sending set monitoring mode response")
440
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
441
442
        elif typeid == ua.NodeId(ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary):
443
            _logger.info("set publishing mode request (%s)", user)
444
            params = struct_from_binary(ua.SetPublishingModeParameters, body)
445
            # FIXME: Implement SetPublishingMode
446
            # For now send dummy results to keep clients happy
447
            response = ua.SetPublishingModeResponse()
448
            results = ua.SetPublishingModeResult()
449
            ids = params.SubscriptionIds
450
            statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
451
            results.Results = statuses
452
            response.Parameters = results
453
            _logger.info("sending set publishing mode response")
454
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
455
456
        else:
457
            _logger.warning("Unknown message received %s (%s)", typeid, user)
458
            raise ServiceError(ua.StatusCodes.BadServiceUnsupported)
459
460
        return True
461
462
    async def close(self):
463
        """
464
        to be called when client has disconnected to ensure we really close
465
        everything we should
466
        """
467
        _logger.info("Cleanup client connection: %s", self.name)
468
        if self.session:
469
            await self.session.close_session(True)
470