asyncua.server.uaprocessor   F
last analyzed

Complexity

Total Complexity 70

Size/Duplication

Total Lines 488
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 370
dl 0
loc 488
rs 2.8
c 0
b 0
f 0
wmc 70

10 Methods

Rating   Name   Duplication   Size   Complexity  
A UaProcessor.__init__() 0 11 1
A UaProcessor.set_policies() 0 2 1
A UaProcessor.send_response() 0 5 1
A PublishRequestData.__init__() 0 4 1
C UaProcessor.process() 0 28 9
A UaProcessor.open_secure_channel() 0 14 2
B UaProcessor.process_message() 0 31 5
B UaProcessor.forward_publish_response() 0 24 6
A UaProcessor.close() 0 8 2
F UaProcessor._process_message() 0 329 42

How to fix   Complexity   

Complexity

Complex classes like asyncua.server.uaprocessor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import time
2
import logging
3
from typing import Deque, Optional
4
from collections import deque
5
6
from asyncua import ua
7
from ..ua.ua_binary import nodeid_from_binary, struct_from_binary, struct_to_binary, uatcp_to_binary
8
from .internal_server import InternalServer, InternalSession
9
from ..common.connection import SecureConnection
10
from ..common.utils import ServiceError
11
12
_logger = logging.getLogger(__name__)
13
14
15
class PublishRequestData:
16
17
    def __init__(self, requesthdr=None, seqhdr=None):
18
        self.requesthdr = requesthdr
19
        self.seqhdr = seqhdr
20
        self.timestamp = time.time()
21
22
23
class UaProcessor:
24
    """
25
    Processor for OPC UA messages. Implements the OPC UA protocol for the server side.
26
    """
27
28
    def __init__(self, internal_server: InternalServer, transport):
29
        self.iserver: InternalServer = internal_server
30
        self.name = transport.get_extra_info('peername')
31
        self.sockname = transport.get_extra_info('sockname')
32
        self.session: Optional[InternalSession] = None
33
        self._transport = transport
34
        # deque for Publish Requests
35
        self._publish_requests: Deque[PublishRequestData] = deque()
36
        # used when we need to wait for PublishRequest
37
        self._publish_results: Deque[ua.PublishResult] = deque()
38
        self._connection = SecureConnection(ua.SecurityPolicy())
39
40
    def set_policies(self, policies):
41
        self._connection.set_policy_factories(policies)
42
43
    def send_response(self, requesthandle, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
44
        response.ResponseHeader.RequestHandle = requesthandle
45
        data = self._connection.message_to_binary(
46
            struct_to_binary(response), message_type=msgtype, request_id=seqhdr.RequestId)
47
        self._transport.write(data)
48
49
    def open_secure_channel(self, algohdr, seqhdr, body):
50
        request = struct_from_binary(ua.OpenSecureChannelRequest, body)
51
52
        if not self._connection.is_open():
53
            # Only call select_policy if the channel isn't open. Otherwise
54
            # it will break the Secure channel renewal.
55
            self._connection.select_policy(
56
                algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
57
58
        channel = self._connection.open(request.Parameters, self.iserver)
59
        # send response
60
        response = ua.OpenSecureChannelResponse()
61
        response.Parameters = channel
62
        self.send_response(request.RequestHeader.RequestHandle, seqhdr, response, ua.MessageType.SecureOpen)
63
64
    async def forward_publish_response(self, result: ua.PublishResult):
65
        """
66
        Try to send a `PublishResponse` with the given `PublishResult`.
67
        """
68
        # _logger.info("forward publish response %s", result)
69
        while True:
70
            if not self._publish_requests:
71
                self._publish_results.append(result)
72
                _logger.info(
73
                    "Server wants to send publish answer but no publish request is available,"
74
                    "enqueuing notification, length of result queue is %s",
75
                    len(self._publish_results)
76
                )
77
                return
78
            # We pop left from the Publish Request deque (FIFO)
79
            requestdata = self._publish_requests.popleft()
80
            if (requestdata.requesthdr.TimeoutHint == 0 or
81
                    requestdata.requesthdr.TimeoutHint != 0 and
82
                    time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000):
83
                # Continue and use `requestdata` only if there was no timeout
84
                break
85
        response = ua.PublishResponse()
86
        response.Parameters = result
87
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.seqhdr, response)
88
89
    async def process(self, header, body):
90
        try:
91
            msg = self._connection.receive_from_header_and_body(header, body)
92
        except ua.uaerrors.BadUserAccessDenied:
93
            _logger.warning("Unauthenticated user attempted to connect")
94
            return False
95
        if isinstance(msg, ua.Message):
96
            if header.MessageType == ua.MessageType.SecureOpen:
97
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
98
            elif header.MessageType == ua.MessageType.SecureClose:
99
                self._connection.close()
100
                return False
101
            elif header.MessageType == ua.MessageType.SecureMessage:
102
                return await self.process_message(msg.SequenceHeader(), msg.body())
103
        elif isinstance(msg, ua.Hello):
104
            ack = ua.Acknowledge()
105
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
106
            ack.SendBufferSize = msg.SendBufferSize
107
            data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
108
            self._transport.write(data)
109
        elif isinstance(msg, ua.ErrorMessage):
110
            _logger.warning("Received an error message type")
111
        elif msg is None:
112
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
113
        else:
114
            _logger.warning("Unsupported message type: %s", header.MessageType)
115
            raise ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
116
        return True
117
118
    async def process_message(self, seqhdr, body):
119
        """
120
        Process incoming messages.
121
        """
122
        typeid = nodeid_from_binary(body)
123
        requesthdr = struct_from_binary(ua.RequestHeader, body)
124
        _logger.debug('process_message %r %r', typeid, requesthdr)
125
        try:
126
            return await self._process_message(typeid, requesthdr, seqhdr, body)
127
        except ServiceError as e:
128
            status = ua.StatusCode(e.code)
129
            response = ua.ServiceFault()
130
            response.ResponseHeader.ServiceResult = status
131
            _logger.error("sending service fault response: %s (%s)", status.doc, status.name)
132
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
133
            return True
134
        except ua.uaerrors.BadUserAccessDenied:
135
            if self.session:
136
                user = self.session.user
137
            else:
138
                user = 'Someone'
139
            _logger.warning("%s attempted to do something they are not permitted to do", user)
140
            response = ua.ServiceFault()
141
            response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
142
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
143
        except Exception:
144
            _logger.exception('Error while processing message')
145
            response = ua.ServiceFault()
146
            response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadInternalError)
147
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
148
            return True
149
150
    async def _process_message(self, typeid, requesthdr, seqhdr, body):
151
        if typeid in [ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary),
152
                      ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary),
153
                      ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary),
154
                      ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary)]:
155
            # The connection is first created without a user being attached, and then during activation the
156
            user = None
157
        elif self.session is None:
158
            _logger.warning("Received a request of type %d without an existing session", typeid.Identifier)
159
            raise ua.uaerrors.BadUserAccessDenied
160
        else:
161
            user = self.session.user
162
            if self._connection.security_policy.permissions is not None:
163
                if self._connection.security_policy.permissions.check_validity(user, typeid, body) is False:
164
                    raise ua.uaerrors.BadUserAccessDenied
165
166
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
167
            _logger.info("Create session request (%s)", user)
168
            params = struct_from_binary(ua.CreateSessionParameters, body)
169
            # create the session on server
170
            self.session = self.iserver.create_session(self.name, external=True)
171
            # get a session creation result to send back
172
            sessiondata = await self.session.create_session(params, sockname=self.sockname)
173
            response = ua.CreateSessionResponse()
174
            response.Parameters = sessiondata
175
            response.Parameters.ServerCertificate = self._connection.security_policy.host_certificate
176
            if self._connection.security_policy.peer_certificate is None:
177
                data = params.ClientNonce
178
            else:
179
                data = self._connection.security_policy.peer_certificate + params.ClientNonce
180
            response.Parameters.ServerSignature.Signature = \
181
                self._connection.security_policy.asymmetric_cryptography.signature(data)
182
            response.Parameters.ServerSignature.Algorithm = self._connection.security_policy.AsymmetricSignatureURI
183
            # _logger.info("sending create session response")
184
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
185
186
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
187
            _logger.info("Close session request (%s)", user)
188
            if self.session:
189
                deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
190
                await self.session.close_session(deletesubs)
191
            else:
192
                _logger.info("Request to close non-existing session (%s)", user)
193
194
            response = ua.CloseSessionResponse()
195
            _logger.info("sending close session response (%s)", user)
196
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
197
198
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
199
            _logger.info("Activate session request (%s)", user)
200
            params = struct_from_binary(ua.ActivateSessionParameters, body)
201
            if not self.session:
202
                _logger.info("request to activate non-existing session (%s)", user)
203
                raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
204
            if self._connection.security_policy.host_certificate is None:
205
                data = self.session.nonce
206
            else:
207
                data = self._connection.security_policy.host_certificate + self.session.nonce
208
            self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
209
            result = self.session.activate_session(params, self._connection.security_policy.peer_certificate)
210
            response = ua.ActivateSessionResponse()
211
            response.Parameters = result
212
            # _logger.info("sending read response")
213
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
214
215
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
216
            _logger.info("Read request (%s)", user)
217
            params = struct_from_binary(ua.ReadParameters, body)
218
            results = await self.session.read(params)
219
            response = ua.ReadResponse()
220
            response.Results = results
221
            # _logger.info("sending read response")
222
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
223
224
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
225
            _logger.info("Write request (%s)", user)
226
            params = struct_from_binary(ua.WriteParameters, body)
227
            results = await self.session.write(params)
228
            response = ua.WriteResponse()
229
            response.Results = results
230
            # _logger.info("sending write response")
231
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
232
233
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
234
            _logger.info("Browse request (%s)", user)
235
            params = struct_from_binary(ua.BrowseParameters, body)
236
            results = await self.session.browse(params)
237
            response = ua.BrowseResponse()
238
            response.Results = results
239
            # _logger.info("sending browse response")
240
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
241
242
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
243
            _logger.info("get endpoints request (%s)", user)
244
            params = struct_from_binary(ua.GetEndpointsParameters, body)
245
            endpoints = await self.iserver.get_endpoints(params, sockname=self.sockname)
246
            response = ua.GetEndpointsResponse()
247
            response.Endpoints = endpoints
248
            # _logger.info("sending get endpoints response")
249
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
250
251
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
252
            _logger.info("find servers request (%s)", user)
253
            params = struct_from_binary(ua.FindServersParameters, body)
254
            servers = self.iserver.find_servers(params)
255
            response = ua.FindServersResponse()
256
            response.Servers = servers
257
            # _logger.info("sending find servers response")
258
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
259
260
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
261
            _logger.info("register server request %s", user)
262
            serv = struct_from_binary(ua.RegisteredServer, body)
263
            self.iserver.register_server(serv)
264
            response = ua.RegisterServerResponse()
265
            # _logger.info("sending register server response")
266
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
267
268
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
269
            _logger.info("register server 2 request %s", user)
270
            params = struct_from_binary(ua.RegisterServer2Parameters, body)
271
            results = self.iserver.register_server2(params)
272
            response = ua.RegisterServer2Response()
273
            response.ConfigurationResults = results
274
            # _logger.info("sending register server 2 response")
275
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
276
277
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
278
            _logger.info("translate browsepaths to nodeids request (%s)", user)
279
            params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
280
            paths = await self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
281
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
282
            response.Results = paths
283
            # _logger.info("sending translate browsepaths to nodeids response")
284
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
285
286
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
287
            _logger.info("add nodes request (%s)", user)
288
            params = struct_from_binary(ua.AddNodesParameters, body)
289
            results = await self.session.add_nodes(params.NodesToAdd)
290
            response = ua.AddNodesResponse()
291
            response.Results = results
292
            # _logger.info("sending add node response")
293
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
294
295
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
296
            _logger.info("delete nodes request (%s)", user)
297
            params = struct_from_binary(ua.DeleteNodesParameters, body)
298
            results = await self.session.delete_nodes(params)
299
            response = ua.DeleteNodesResponse()
300
            response.Results = results
301
            # _logger.info("sending delete node response")
302
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
303
304
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
305
            _logger.info("add references request (%s)", user)
306
            params = struct_from_binary(ua.AddReferencesParameters, body)
307
            results = await self.session.add_references(params.ReferencesToAdd)
308
            response = ua.AddReferencesResponse()
309
            response.Results = results
310
            # _logger.info("sending add references response")
311
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
312
313
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
314
            _logger.info("delete references request (%s)", user)
315
            params = struct_from_binary(ua.DeleteReferencesParameters, body)
316
            results = await self.session.delete_references(params.ReferencesToDelete)
317
            response = ua.DeleteReferencesResponse()
318
            response.Parameters.Results = results
319
            # _logger.info("sending delete references response")
320
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
321
322
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
323
            _logger.info("create subscription request (%s)", user)
324
            params = struct_from_binary(ua.CreateSubscriptionParameters, body)
325
            result = await self.session.create_subscription(params, callback=self.forward_publish_response)
326
            response = ua.CreateSubscriptionResponse()
327
            response.Parameters = result
328
            # _logger.info("sending create subscription response")
329
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
330
331
        elif typeid == ua.NodeId(ua.ObjectIds.ModifySubscriptionRequest_Encoding_DefaultBinary):
332
            _logger.info("modify subscription request")
333
            params = struct_from_binary(ua.ModifySubscriptionParameters, body)
334
335
            result = self.session.modify_subscription(params, self.forward_publish_response)
336
337
            response = ua.ModifySubscriptionResponse()
338
            response.Parameters = result
339
340
            #_logger.info("sending modify subscription response")
341
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
342
343
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
344
            _logger.info("delete subscriptions request (%s)", user)
345
            params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
346
            results = await self.session.delete_subscriptions(params.SubscriptionIds)
347
            response = ua.DeleteSubscriptionsResponse()
348
            response.Results = results
349
            # _logger.info("sending delete subscription response")
350
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
351
352
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
353
            _logger.info("create monitored items request (%s)", user)
354
            params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
355
            results = await self.session.create_monitored_items(params)
356
            response = ua.CreateMonitoredItemsResponse()
357
            response.Results = results
358
            # _logger.info("sending create monitored items response")
359
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
360
361
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
362
            _logger.info("modify monitored items request (%s)", user)
363
            params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
364
            results = await self.session.modify_monitored_items(params)
365
            response = ua.ModifyMonitoredItemsResponse()
366
            response.Results = results
367
            # _logger.info("sending modify monitored items response")
368
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
369
370
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
371
            _logger.info("delete monitored items request (%s)", user)
372
            params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
373
            results = await self.session.delete_monitored_items(params)
374
            response = ua.DeleteMonitoredItemsResponse()
375
            response.Results = results
376
            # _logger.info("sending delete monitored items response")
377
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
378
379
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
380
            _logger.info("history read request (%s)", user)
381
            params = struct_from_binary(ua.HistoryReadParameters, body)
382
            results = await self.session.history_read(params)
383
            response = ua.HistoryReadResponse()
384
            response.Results = results
385
            # _logger.info("sending history read response")
386
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
387
388
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
389
            _logger.info("register nodes request (%s)", user)
390
            params = struct_from_binary(ua.RegisterNodesParameters, body)
391
            _logger.info("Node registration not implemented")
392
            response = ua.RegisterNodesResponse()
393
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
394
            # _logger.info("sending register nodes response")
395
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
396
397
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
398
            _logger.info("unregister nodes request (%s)", user)
399
            params = struct_from_binary(ua.UnregisterNodesParameters, body)
400
            response = ua.UnregisterNodesResponse()
401
            # _logger.info("sending unregister nodes response")
402
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
403
404
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
405
            _logger.debug("publish request (%s)", user)
406
            if not self.session:
407
                return False
408
            params = struct_from_binary(ua.PublishParameters, body)
409
            data = PublishRequestData(requesthdr=requesthdr, seqhdr=seqhdr)
410
            # Store the Publish Request (will be used to send publish answers from server)
411
            self._publish_requests.append(data)
412
            # If there is an enqueued result forward it immediately
413
            while self._publish_results:
414
                result = self._publish_results.popleft()
415
                if result.SubscriptionId not in self.session.subscription_service.active_subscription_ids:
416
                    # Discard the result if the subscription is no longer active
417
                    continue
418
                await self.forward_publish_response(result)
419
                break
420
            self.session.publish(params.SubscriptionAcknowledgements)
421
            # _logger.debug("publish forward to server")
422
423
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
424
            _logger.info("re-publish request (%s)", user)
425
            params = struct_from_binary(ua.RepublishParameters, body)
426
            msg = self.session.republish(params)
427
            response = ua.RepublishResponse()
428
            response.NotificationMessage = msg
429
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
430
431
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
432
            _logger.info("close secure channel request (%s)", user)
433
            self._connection.close()
434
            response = ua.CloseSecureChannelResponse()
435
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
436
            return False
437
438
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
439
            _logger.info("call request (%s)", user)
440
            params = struct_from_binary(ua.CallParameters, body)
441
            results = await self.session.call(params.MethodsToCall)
442
            response = ua.CallResponse()
443
            response.Results = results
444
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
445
446
        elif typeid == ua.NodeId(ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary):
447
            _logger.info("set monitoring mode request (%s)", user)
448
            params = struct_from_binary(ua.SetMonitoringModeParameters, body)
449
            # FIXME: Implement SetMonitoringMode
450
            # For now send dummy results to keep clients happy
451
            response = ua.SetMonitoringModeResponse()
452
            results = ua.SetMonitoringModeResult()
453
            ids = params.MonitoredItemIds
454
            statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
455
            results.Results = statuses
456
            response.Parameters = results
457
            _logger.info("sending set monitoring mode response")
458
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
459
460
        elif typeid == ua.NodeId(ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary):
461
            _logger.info("set publishing mode request (%s)", user)
462
            params = struct_from_binary(ua.SetPublishingModeParameters, body)
463
            # FIXME: Implement SetPublishingMode
464
            # For now send dummy results to keep clients happy
465
            response = ua.SetPublishingModeResponse()
466
            results = ua.SetPublishingModeResult()
467
            ids = params.SubscriptionIds
468
            statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
469
            results.Results = statuses
470
            response.Parameters = results
471
            _logger.info("sending set publishing mode response")
472
            self.send_response(requesthdr.RequestHandle, seqhdr, response)
473
474
        else:
475
            _logger.warning("Unknown message received %s (%s)", typeid, user)
476
            raise ServiceError(ua.StatusCodes.BadServiceUnsupported)
477
478
        return True
479
480
    async def close(self):
481
        """
482
        to be called when client has disconnected to ensure we really close
483
        everything we should
484
        """
485
        _logger.info("Cleanup client connection: %s", self.name)
486
        if self.session:
487
            await self.session.close_session(True)
488