Completed
Push — master ( 6a68a0...bac760 )
by Olivier
02:36
created

opcua.server.UaProcessor   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 415
Duplicated Lines 0 %

Test Coverage

Coverage 84.23%
Metric Value
dl 0
loc 415
ccs 251
cts 298
cp 0.8423
rs 5.5555
wmc 56

10 Methods

Rating   Name   Duplication   Size   Complexity  
A UaProcessor.__init__() 0 13 1
A UaProcessor.set_policies() 0 2 1
F UaProcessor._process_message() 0 297 30
A UaProcessor.close() 0 8 2
A UaProcessor._open_secure_channel() 0 14 3
A UaProcessor.process_message() 0 12 2
F UaProcessor.process() 0 28 9
A UaProcessor.send_response() 0 5 2
B UaProcessor.forward_publish_response() 0 16 5
A UaProcessor.open_secure_channel() 0 9 1

How to fix   Complexity   

Complex Class

Complex classes like opcua.server.UaProcessor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
from datetime import datetime
5 1
import time
6
7 1
from opcua import ua
8 1
from opcua.common import utils
9
10
11 1
class PublishRequestData(object):
12
13 1
    def __init__(self):
14 1
        self.requesthdr = None
15 1
        self.algohdr = None
16 1
        self.seqhdr = None
17 1
        self.timestamp = time.time()
18
19
20 1
class UaProcessor(object):
21
22 1
    def __init__(self, internal_server, socket):
23 1
        self.logger = logging.getLogger(__name__)
24 1
        self.iserver = internal_server
25 1
        self.name = socket.get_extra_info('peername')
26 1
        self.sockname = socket.get_extra_info('sockname')
27 1
        self.session = None
28 1
        self.channel = None
29 1
        self.socket = socket
30 1
        self._socketlock = Lock()
31 1
        self._datalock = RLock()
32 1
        self._publishdata_queue = []
33 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
34 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
35
36 1
    def set_policies(self, policies):
37 1
        self._connection.set_policy_factories(policies)
38
39 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
40 1
        with self._socketlock:
41 1
            response.ResponseHeader.RequestHandle = requesthandle
42 1
            data = self._connection.message_to_binary(response.to_binary(), msgtype, seqhdr.RequestId)
43 1
            self.socket.write(data)
44
45 1
    def open_secure_channel(self, algohdr, seqhdr, body):
46 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
47
48 1
        self._connection.select_policy(algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
49 1
        channel = self._open_secure_channel(request.Parameters)
50
        # send response
51 1
        response = ua.OpenSecureChannelResponse()
52 1
        response.Parameters = channel
53 1
        self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
54
55 1
    def forward_publish_response(self, result):
56 1
        self.logger.info("forward publish response %s", result)
57 1
        with self._datalock:
58 1
            while True:
59 1
                if len(self._publishdata_queue) == 0:
60
                    self._publish_result_queue.append(result)
61
                    self.logger.info("Server wants to send publish answer but no publish request is available, enqueing notification, length of result queue is %s", len(self._publish_result_queue))
62
                    return
63 1
                requestdata = self._publishdata_queue.pop(0)
64 1
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
65 1
                    break
66
67 1
        response = ua.PublishResponse()
68 1
        response.Parameters = result
69
70 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
71
72 1
    def process(self, header, body):
73 1
        msg = self._connection.receive_from_header_and_body(header, body)
74 1
        if isinstance(msg, ua.Message):
75 1
            if header.MessageType == ua.MessageType.SecureOpen:
76 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
77
78 1
            elif header.MessageType == ua.MessageType.SecureClose:
79 1
                if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
80
                    self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
81 1
                return False
82
83 1
            elif header.MessageType == ua.MessageType.SecureMessage:
84 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
85
86 1
        elif isinstance(msg, ua.Hello):
87 1
            ack = ua.Acknowledge()
88 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
89 1
            ack.SendBufferSize = msg.SendBufferSize
90 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
91 1
            self.socket.write(data)
92
93
        elif isinstance(msg, ua.ErrorMessage):
94
            self.logger.warning("Received an error message type")
95
96
        else:
97
            self.logger.warning("Unsupported message type: %s", header.MessageType)
98
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
99 1
        return True
100
101 1
    def process_message(self, algohdr, seqhdr, body):
102 1
        typeid = ua.NodeId.from_binary(body)
103 1
        requesthdr = ua.RequestHeader.from_binary(body)
104 1
        try:
105 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
106 1
        except utils.ServiceError as e:
107 1
            status = ua.StatusCode(e.code)
108 1
            response = ua.ServiceFault()
109 1
            response.ResponseHeader.ServiceResult = status
110 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
111 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
112 1
            return True
113
114 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
115 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
116 1
            self.logger.info("Create session request")
117 1
            params = ua.CreateSessionParameters.from_binary(body)
118
119 1
            self.session = self.iserver.create_session(self.name, external=True)  # create the session on server
120 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
121
122 1
            response = ua.CreateSessionResponse()
123 1
            response.Parameters = sessiondata
124 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
125 1
            if self._connection._security_policy.server_certificate is None:
126 1
                data = params.ClientNonce
127
            else:
128
                data = self._connection._security_policy.server_certificate + params.ClientNonce
129 1
            response.Parameters.ServerSignature.Signature = self._connection._security_policy.asymmetric_cryptography.signature(data)
130 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
131
132 1
            self.logger.info("sending create sesssion response")
133 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
134
135 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
136 1
            self.logger.info("Close session request")
137 1
            deletesubs = ua.unpack_uatype('Boolean', body)
138
139 1
            self.session.close_session(deletesubs)
140
141 1
            response = ua.CloseSessionResponse()
142 1
            self.logger.info("sending close sesssion response")
143 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
144
145 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
146 1
            self.logger.info("Activate session request")
147 1
            params = ua.ActivateSessionParameters.from_binary(body)
148
149 1
            if not self.session:
150
                self.logger.info("request to activate non-existing session")
151
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
152
153 1
            if self._connection._security_policy.client_certificate is None:
154 1
                data = self.session.nonce
155
            else:
156
                data = self._connection._security_policy.client_certificate + self.session.nonce
157 1
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
158
159 1
            result = self.session.activate_session(params)
160
161 1
            response = ua.ActivateSessionResponse()
162 1
            response.Parameters = result
163
164 1
            self.logger.info("sending read response")
165 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
166
167 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
168 1
            self.logger.info("Read request")
169 1
            params = ua.ReadParameters.from_binary(body)
170
171 1
            results = self.session.read(params)
172
173 1
            response = ua.ReadResponse()
174 1
            response.Results = results
175
176 1
            self.logger.info("sending read response")
177 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
178
179 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
180 1
            self.logger.info("Write request")
181 1
            params = ua.WriteParameters.from_binary(body)
182
183 1
            results = self.session.write(params)
184
185 1
            response = ua.WriteResponse()
186 1
            response.Results = results
187
188 1
            self.logger.info("sending write response")
189 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
190
191 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
192 1
            self.logger.info("Browse request")
193 1
            params = ua.BrowseParameters.from_binary(body)
194
195 1
            results = self.session.browse(params)
196
197 1
            response = ua.BrowseResponse()
198 1
            response.Results = results
199
200 1
            self.logger.info("sending browse response")
201 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
202
203 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
204 1
            self.logger.info("get endpoints request")
205 1
            params = ua.GetEndpointsParameters.from_binary(body)
206
207 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
208
209 1
            response = ua.GetEndpointsResponse()
210 1
            response.Endpoints = endpoints
211
212 1
            self.logger.info("sending get endpoints response")
213 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
214
215 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
216 1
            self.logger.info("find servers request")
217 1
            params = ua.FindServersParameters.from_binary(body)
218
219 1
            servers = self.iserver.find_servers(params)
220
221 1
            response = ua.FindServersResponse()
222 1
            response.Servers = servers
223
224 1
            self.logger.info("sending find servers response")
225 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
226
227 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
228 1
            self.logger.info("register server request")
229 1
            serv = ua.RegisteredServer.from_binary(body)
230
231 1
            self.iserver.register_server(serv)
232
233 1
            response = ua.RegisterServerResponse()
234
235 1
            self.logger.info("sending register server response")
236 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
237
238 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
239
            self.logger.info("register server 2 request")
240
            params = ua.RegisterServer2Parameters.from_binary(body)
241
242
            results = self.iserver.register_server2(params)
243
244
            response = ua.RegisterServer2Response()
245
            response.ConfigurationResults = results
246
247
            self.logger.info("sending register server 2 response")
248
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
249
250 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
251 1
            self.logger.info("translate browsepaths to nodeids request")
252 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
253
254 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
255
256 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
257 1
            response.Results = paths
258
259 1
            self.logger.info("sending translate browsepaths to nodeids response")
260 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
261
262 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
263 1
            self.logger.info("add nodes request")
264 1
            params = ua.AddNodesParameters.from_binary(body)
265
266 1
            results = self.session.add_nodes(params.NodesToAdd)
267
268 1
            response = ua.AddNodesResponse()
269 1
            response.Results = results
270
271 1
            self.logger.info("sending add node response")
272 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
273
274 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
275 1
            self.logger.info("delete nodes request")
276 1
            params = ua.DeleteNodesParameters.from_binary(body)
277
278 1
            results = self.session.delete_nodes(params.NodesToDelete)
279
280 1
            response = ua.DeleteNodesResponse()
281 1
            response.Results = results
282
283 1
            self.logger.info("sending delete node response")
284 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
285
286 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
287 1
            self.logger.info("create subscription request")
288 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
289
290 1
            result = self.session.create_subscription(params, self.forward_publish_response)
291
292 1
            response = ua.CreateSubscriptionResponse()
293 1
            response.Parameters = result
294
295 1
            self.logger.info("sending create subscription response")
296 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
297
298 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
299 1
            self.logger.info("delete subscriptions request")
300 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
301
302 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
303
304 1
            response = ua.DeleteSubscriptionsResponse()
305 1
            response.Results = results
306
307 1
            self.logger.info("sending delte subscription response")
308 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
309
310 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
311 1
            self.logger.info("create monitored items request")
312 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
313 1
            results = self.session.create_monitored_items(params)
314
315 1
            response = ua.CreateMonitoredItemsResponse()
316 1
            response.Results = results
317
318 1
            self.logger.info("sending create monitored items response")
319 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
320
321 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
322
            self.logger.info("modify monitored items request")
323
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
324
            results = self.session.modify_monitored_items(params)
325
326
            response = ua.ModifyMonitoredItemsResponse()
327
            response.Results = results
328
329
            self.logger.info("sending modify monitored items response")
330
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
331
332 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
333 1
            self.logger.info("delete monitored items request")
334 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
335
336 1
            results = self.session.delete_monitored_items(params)
337
338 1
            response = ua.DeleteMonitoredItemsResponse()
339 1
            response.Results = results
340
341 1
            self.logger.info("sending delete monitored items response")
342 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
343
344 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
345
            self.logger.info("history read request")
346
            params = ua.HistoryReadParameters.from_binary(body)
347
348
            results = self.session.history_read(params)
349
350
            response = ua.HistoryReadResponse()
351
            response.Results = results
352
353
            self.logger.info("sending history read response")
354
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
355
356 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
357 1
            self.logger.info("publish request")
358
359 1
            if not self.session:
360
                return False
361
362 1
            params = ua.PublishParameters.from_binary(body)
363
364 1
            data = PublishRequestData()
365 1
            data.requesthdr = requesthdr
366 1
            data.seqhdr = seqhdr
367 1
            data.algohdr = algohdr
368 1
            with self._datalock:
369 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
370 1
                if self._publish_result_queue:
371
                    result = self._publish_result_queue.pop(0)
372
                    self.forward_publish_response(result)
373 1
            self.session.publish(params.SubscriptionAcknowledgements)
374 1
            self.logger.info("publish forward to server")
375
376 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
377
            self.logger.info("re-publish request")
378
379
            params = ua.RepublishParameters.from_binary(body)
380
            msg = self.session.republish(params)
381
382
            response = ua.RepublishResponse()
383
            response.NotificationMessage = msg
384
385
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
386
387 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
388
            self.logger.info("close secure channel request")
389
            response = ua.CloseSecureChannelResponse()
390
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
391
            self.channel = None
392
            return False
393
394 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
395 1
            self.logger.info("call request")
396
397 1
            params = ua.CallParameters.from_binary(body)
398
399 1
            results = self.session.call(params.MethodsToCall)
400
401 1
            response = ua.CallResponse()
402 1
            response.Results = results
403
404 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
405
406
        else:
407 1
            self.logger.warning("Unknown message received %s", typeid)
408 1
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
409
410 1
        return True
411
412 1
    def _open_secure_channel(self, params):
413 1
        self.logger.info("open secure channel")
414 1
        if not self.channel or params.RequestType == ua.SecurityTokenRequestType.Issue:
415 1
            self.channel = ua.OpenSecureChannelResult()
416 1
            self.channel.SecurityToken.TokenId = 13  # random value
417 1
            self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
418 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
419 1
        self.channel.SecurityToken.TokenId += 1
420 1
        self.channel.SecurityToken.CreatedAt = datetime.utcnow()
421 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
422 1
        self.channel.ServerNonce = utils.create_nonce(self._connection._security_policy.symmetric_key_size)
423 1
        self._connection.set_security_token(self.channel.SecurityToken)
424 1
        self._connection._security_policy.make_symmetric_key(self.channel.ServerNonce, params.ClientNonce)
425 1
        return self.channel
426
427 1
    def close(self):
428
        """
429
        to be called when client has disconnected to ensure we really close
430
        everything we should
431
        """
432 1
        print("Cleanup client connection: ", self.name)
433 1
        if self.session:
434
            self.session.close_session(True)
435