Passed
Pull Request — master (#41)
by Olivier
02:17
created

asyncua.server.uaprocessor.UaProcessor.__init__()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 3
dl 0
loc 11
rs 9.95
c 0
b 0
f 0
1
import time
2
import logging
3
from typing import Deque
4
from collections import deque
5
6
from asyncua import ua
7
from ..ua.ua_binary import nodeid_from_binary, struct_from_binary, struct_to_binary, uatcp_to_binary
8
from .internal_server import InternalServer, InternalSession
9
from ..common.connection import SecureConnection
10
from ..common.utils import ServiceError
11
12
_logger = logging.getLogger(__name__)
13
14
15
class PublishRequestData:
16
17
    def __init__(self, requesthdr=None, algohdr=None, seqhdr=None):
18
        self.requesthdr = requesthdr
19
        self.algohdr = algohdr
20
        self.seqhdr = seqhdr
21
        self.timestamp = time.time()
22
23
24
class UaProcessor:
25
    """
26
    ToDo: remove/replace Lock
27
    ToDo: Refactor queues with asyncio.Queue
28
    """
29
30
    def __init__(self, internal_server: InternalServer, transport):
31
        self.iserver: InternalServer = internal_server
32
        self.name = transport.get_extra_info('peername')
33
        self.sockname = transport.get_extra_info('sockname')
34
        self.session: InternalSession = None
35
        self._transport = transport
36
        # deque for Publish Requests
37
        self._publish_requests: Deque[PublishRequestData] = deque()
38
        # used when we need to wait for PublishRequest
39
        self._publish_results = deque()
40
        self._connection = SecureConnection(ua.SecurityPolicy())
41
42
    def set_policies(self, policies):
43
        self._connection.set_policy_factories(policies)
44
45
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
46
        response.ResponseHeader.RequestHandle = requesthandle
47
        data = self._connection.message_to_binary(
48
            struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr
49
        )
50
        self._transport.write(data)
51
52
    def open_secure_channel(self, algohdr, seqhdr, body):
53
        request = struct_from_binary(ua.OpenSecureChannelRequest, body)
54
55
        self._connection.select_policy(
56
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
57
58
        channel = self._connection.open(request.Parameters, self.iserver)
59
        # send response
60
        response = ua.OpenSecureChannelResponse()
61
        response.Parameters = channel
62
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
63
64
    def forward_publish_response(self, result):
65
        """
66
        Try to send a `PublishResponse` for the given result.
67
        """
68
        _logger.info("forward publish response %s", result)
69
        while True:
70
            if not len(self._publish_requests):
71
                self._publish_results.append(result)
72
                _logger.info(
73
                    "Server wants to send publish answer but no publish request is available,"
74
                    "enqueing notification, length of result queue is %s",
75
                    len(self._publish_results)
76
                )
77
                return
78
            # We pop left from the Publieh Request deque (FIFO)
79
            requestdata = self._publish_requests.popleft()
80
            if (requestdata.requesthdr.TimeoutHint == 0 or
81
                    requestdata.requesthdr.TimeoutHint != 0 and
82
                    time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000):
83
                # Continue and use `requestdata` only if there was no timeout
84
                break
85
        response = ua.PublishResponse()
86
        response.Parameters = result
87
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
88
89
    async def process(self, header, body):
90
        msg = self._connection.receive_from_header_and_body(header, body)
91
        if isinstance(msg, ua.Message):
92
            if header.MessageType == ua.MessageType.SecureOpen:
93
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
94
            elif header.MessageType == ua.MessageType.SecureClose:
95
                self._connection.close()
96
                return False
97
            elif header.MessageType == ua.MessageType.SecureMessage:
98
                return await self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
99
        elif isinstance(msg, ua.Hello):
100
            ack = ua.Acknowledge()
101
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
102
            ack.SendBufferSize = msg.SendBufferSize
103
            data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
104
            self._transport.write(data)
105
        elif isinstance(msg, ua.ErrorMessage):
106
            _logger.warning("Received an error message type")
107
        elif msg is None:
108
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
109
        else:
110
            _logger.warning("Unsupported message type: %s", header.MessageType)
111
            raise ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
112
        return True
113
114
    async def process_message(self, algohdr, seqhdr, body):
115
        """
116
        Process incoming messages.
117
        """
118
        typeid = nodeid_from_binary(body)
119
        requesthdr = struct_from_binary(ua.RequestHeader, body)
120
        _logger.debug('process_message %r %r', typeid, requesthdr)
121
        try:
122
            return await self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
123
        except ServiceError as e:
124
            status = ua.StatusCode(e.code)
125
            response = ua.ServiceFault()
126
            response.ResponseHeader.ServiceResult = status
127
            _logger.error("sending service fault response: %s (%s)", status.doc, status.name)
128
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
129
            return True
130
        except Exception:
131
            _logger.exception('Error while processing message')
132
            response = ua.ServiceFault()
133
            response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadInternalError)
134
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
135
            return True
136
137
    async def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
138
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
139
            _logger.info("Create session request")
140
            params = struct_from_binary(ua.CreateSessionParameters, body)
141
            # create the session on server
142
            self.session = self.iserver.create_session(self.name, external=True)
143
            # get a session creation result to send back
144
            sessiondata = await self.session.create_session(params, sockname=self.sockname)
145
            response = ua.CreateSessionResponse()
146
            response.Parameters = sessiondata
147
            response.Parameters.ServerCertificate = self._connection.security_policy.client_certificate
148
            if self._connection.security_policy.server_certificate is None:
149
                data = params.ClientNonce
150
            else:
151
                data = self._connection.security_policy.server_certificate + params.ClientNonce
152
            response.Parameters.ServerSignature.Signature = \
153
                self._connection.security_policy.asymmetric_cryptography.signature(data)
154
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
155
            _logger.info("sending create session response")
156
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
157
158
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
159
            _logger.info("Close session request")
160
            if self.session:
161
                deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
162
                await self.session.close_session(deletesubs)
163
            else:
164
                _logger.info("Request to close non-existing session")
165
166
            response = ua.CloseSessionResponse()
167
            _logger.info("sending close session response")
168
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
169
170
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
171
            _logger.info("Activate session request")
172
            params = struct_from_binary(ua.ActivateSessionParameters, body)
173
            if not self.session:
174
                _logger.info("request to activate non-existing session")
175
                raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
176
            if self._connection.security_policy.client_certificate is None:
177
                data = self.session.nonce
178
            else:
179
                data = self._connection.security_policy.client_certificate + self.session.nonce
180
            self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
181
            result = self.session.activate_session(params)
182
            response = ua.ActivateSessionResponse()
183
            response.Parameters = result
184
            _logger.info("sending read response")
185
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
186
187
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
188
            _logger.info("Read request")
189
            params = struct_from_binary(ua.ReadParameters, body)
190
            results = await self.session.read(params)
191
            response = ua.ReadResponse()
192
            response.Results = results
193
            _logger.info("sending read response")
194
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
195
196
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
197
            _logger.info("Write request")
198
            params = struct_from_binary(ua.WriteParameters, body)
199
            results = await self.session.write(params)
200
            response = ua.WriteResponse()
201
            response.Results = results
202
            _logger.info("sending write response")
203
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
204
205
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
206
            _logger.info("Browse request")
207
            params = struct_from_binary(ua.BrowseParameters, body)
208
            results = await self.session.browse(params)
209
            response = ua.BrowseResponse()
210
            response.Results = results
211
            _logger.info("sending browse response")
212
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
213
214
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
215
            _logger.info("get endpoints request")
216
            params = struct_from_binary(ua.GetEndpointsParameters, body)
217
            endpoints = await self.iserver.get_endpoints(params, sockname=self.sockname)
218
            response = ua.GetEndpointsResponse()
219
            response.Endpoints = endpoints
220
            _logger.info("sending get endpoints response")
221
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
222
223
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
224
            _logger.info("find servers request")
225
            params = struct_from_binary(ua.FindServersParameters, body)
226
            servers = self.iserver.find_servers(params)
227
            response = ua.FindServersResponse()
228
            response.Servers = servers
229
            _logger.info("sending find servers response")
230
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
231
232
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
233
            _logger.info("register server request")
234
            serv = struct_from_binary(ua.RegisteredServer, body)
235
            self.iserver.register_server(serv)
236
            response = ua.RegisterServerResponse()
237
            _logger.info("sending register server response")
238
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
239
240
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
241
            _logger.info("register server 2 request")
242
            params = struct_from_binary(ua.RegisterServer2Parameters, body)
243
            results = self.iserver.register_server2(params)
244
            response = ua.RegisterServer2Response()
245
            response.ConfigurationResults = results
246
            _logger.info("sending register server 2 response")
247
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
248
249
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
250
            _logger.info("translate browsepaths to nodeids request")
251
            params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
252
            paths = await self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
253
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
254
            response.Results = paths
255
            _logger.info("sending translate browsepaths to nodeids response")
256
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
257
258
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
259
            _logger.info("add nodes request")
260
            params = struct_from_binary(ua.AddNodesParameters, body)
261
            results = await self.session.add_nodes(params.NodesToAdd)
262
            response = ua.AddNodesResponse()
263
            response.Results = results
264
            _logger.info("sending add node response")
265
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
266
267
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
268
            _logger.info("delete nodes request")
269
            params = struct_from_binary(ua.DeleteNodesParameters, body)
270
            results = await self.session.delete_nodes(params)
271
            response = ua.DeleteNodesResponse()
272
            response.Results = results
273
            _logger.info("sending delete node response")
274
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
275
276
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
277
            _logger.info("add references request")
278
            params = struct_from_binary(ua.AddReferencesParameters, body)
279
            results = await self.session.add_references(params.ReferencesToAdd)
280
            response = ua.AddReferencesResponse()
281
            response.Results = results
282
            _logger.info("sending add references response")
283
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
284
285
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
286
            _logger.info("delete references request")
287
            params = struct_from_binary(ua.DeleteReferencesParameters, body)
288
            results = await self.session.delete_references(params.ReferencesToDelete)
289
            response = ua.DeleteReferencesResponse()
290
            response.Parameters.Results = results
291
            _logger.info("sending delete references response")
292
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
293
294
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
295
            _logger.info("create subscription request")
296
            params = struct_from_binary(ua.CreateSubscriptionParameters, body)
297
            result = await self.session.create_subscription(params, self.forward_publish_response)
298
            response = ua.CreateSubscriptionResponse()
299
            response.Parameters = result
300
            _logger.info("sending create subscription response")
301
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
302
303
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
304
            _logger.info("delete subscriptions request")
305
            params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
306
            results = await self.session.delete_subscriptions(params.SubscriptionIds)
307
            response = ua.DeleteSubscriptionsResponse()
308
            response.Results = results
309
            _logger.info("sending delte subscription response")
310
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
311
312
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
313
            _logger.info("create monitored items request")
314
            params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
315
            results = await self.session.create_monitored_items(params)
316
            response = ua.CreateMonitoredItemsResponse()
317
            response.Results = results
318
            _logger.info("sending create monitored items response")
319
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
320
321
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
322
            _logger.info("modify monitored items request")
323
            params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
324
            results = await self.session.modify_monitored_items(params)
325
            response = ua.ModifyMonitoredItemsResponse()
326
            response.Results = results
327
            _logger.info("sending modify monitored items response")
328
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
329
330
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
331
            _logger.info("delete monitored items request")
332
            params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
333
            results = await self.session.delete_monitored_items(params)
334
            response = ua.DeleteMonitoredItemsResponse()
335
            response.Results = results
336
            _logger.info("sending delete monitored items response")
337
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
338
339
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
340
            _logger.info("history read request")
341
            params = struct_from_binary(ua.HistoryReadParameters, body)
342
            results = await self.session.history_read(params)
343
            response = ua.HistoryReadResponse()
344
            response.Results = results
345
            _logger.info("sending history read response")
346
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
347
348
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
349
            _logger.info("register nodes request")
350
            params = struct_from_binary(ua.RegisterNodesParameters, body)
351
            _logger.info("Node registration not implemented")
352
            response = ua.RegisterNodesResponse()
353
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
354
            _logger.info("sending register nodes response")
355
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
356
357
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
358
            _logger.info("unregister nodes request")
359
            params = struct_from_binary(ua.UnregisterNodesParameters, body)
360
            response = ua.UnregisterNodesResponse()
361
            _logger.info("sending unregister nodes response")
362
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
363
364
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
365
            _logger.info("publish request")
366
            if not self.session:
367
                return False
368
            params = struct_from_binary(ua.PublishParameters, body)
369
            data = PublishRequestData(requesthdr=requesthdr, seqhdr=seqhdr, algohdr=algohdr)
370
            # Store the Publish Request (will be used to send publish answers from server)
371
            self._publish_requests.append(data)
372
            # If there is an enqueued result forward it immediately
373
            if len(self._publish_results):
374
                result = self._publish_results.popleft()
375
                self.forward_publish_response(result)
376
            await self.session.publish(params.SubscriptionAcknowledgements)
377
            _logger.info("publish forward to server")
378
379
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
380
            _logger.info("re-publish request")
381
            params = struct_from_binary(ua.RepublishParameters, body)
382
            msg = self.session.republish(params)
383
            response = ua.RepublishResponse()
384
            response.NotificationMessage = msg
385
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
386
387
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
388
            _logger.info("close secure channel request")
389
            self._connection.close()
390
            response = ua.CloseSecureChannelResponse()
391
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
392
            return False
393
394
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
395
            _logger.info("call request")
396
            params = struct_from_binary(ua.CallParameters, body)
397
            results = await self.session.call(params.MethodsToCall)
398
            response = ua.CallResponse()
399
            response.Results = results
400
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
401
        else:
402
            _logger.warning("Unknown message received %s", typeid)
403
            raise ServiceError(ua.StatusCodes.BadNotImplemented)
404
405
        return True
406
407
    async def close(self):
408
        """
409
        to be called when client has disconnected to ensure we really close
410
        everything we should
411
        """
412
        _logger.info("Cleanup client connection: %s", self.name)
413
        if self.session:
414
            await self.session.close_session(True)
415