Completed
Push — master ( 10a906...bb8334 )
by Olivier
04:52
created

UaProcessor._process_message()   F

Complexity

Conditions 34

Size

Total Lines 346

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 178
CRAP Score 49.8411

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 34
c 2
b 0
f 1
dl 0
loc 346
ccs 178
cts 234
cp 0.7607
crap 49.8411
rs 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like UaProcessor._process_message() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
import time
5
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16 1
        self.timestamp = time.time()
17
18
19 1
class UaProcessor(object):
20
21 1
    def __init__(self, internal_server, socket):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self.iserver = internal_server
24 1
        self.name = socket.get_extra_info('peername')
25 1
        self.sockname = socket.get_extra_info('sockname')
26 1
        self.session = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = RLock()
30 1
        self._publishdata_queue = []
31 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
32 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
33
34 1
    def set_policies(self, policies):
35 1
        self._connection.set_policy_factories(policies)
36
37 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
38 1
        with self._socketlock:
39 1
            response.ResponseHeader.RequestHandle = requesthandle
40 1
            data = self._connection.message_to_binary(
41
                response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
42
43 1
            self.socket.write(data)
44
45 1
    def open_secure_channel(self, algohdr, seqhdr, body):
46 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
47
48 1
        self._connection.select_policy(
49
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
50
51 1
        channel = self._connection.open(request.Parameters, self.iserver)
52
        # send response
53 1
        response = ua.OpenSecureChannelResponse()
54 1
        response.Parameters = channel
55 1
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
56
57 1
    def forward_publish_response(self, result):
58 1
        self.logger.info("forward publish response %s", result)
59 1
        with self._datalock:
60 1
            while True:
61 1
                if len(self._publishdata_queue) == 0:
62
                    self._publish_result_queue.append(result)
63
                    self.logger.info("Server wants to send publish answer but no publish request is available,"
64
                                     "enqueing notification, length of result queue is %s",
65
                                     len(self._publish_result_queue))
66
                    return
67 1
                requestdata = self._publishdata_queue.pop(0)
68 1
                if requestdata.requesthdr.TimeoutHint == 0 or requestdata.requesthdr.TimeoutHint != 0 and time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
69 1
                    break
70
71 1
        response = ua.PublishResponse()
72 1
        response.Parameters = result
73
74 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
75
76 1
    def process(self, header, body):
77 1
        msg = self._connection.receive_from_header_and_body(header, body)
78 1
        if isinstance(msg, ua.Message):
79 1
            if header.MessageType == ua.MessageType.SecureOpen:
80 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
81
82 1
            elif header.MessageType == ua.MessageType.SecureClose:
83 1
                self._connection.close()
84 1
                return False
85
86 1
            elif header.MessageType == ua.MessageType.SecureMessage:
87 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
88 1
        elif isinstance(msg, ua.Hello):
89 1
            ack = ua.Acknowledge()
90 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
91 1
            ack.SendBufferSize = msg.SendBufferSize
92 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
93 1
            self.socket.write(data)
94
        elif isinstance(msg, ua.ErrorMessage):
95
            self.logger.warning("Received an error message type")
96
        elif msg is None:
97
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
98
        else:
99
            self.logger.warning("Unsupported message type: %s", header.MessageType)
100
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
101 1
        return True
102
103 1
    def process_message(self, algohdr, seqhdr, body):
104 1
        typeid = ua.NodeId.from_binary(body)
105 1
        requesthdr = ua.RequestHeader.from_binary(body)
106 1
        try:
107 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
108 1
        except utils.ServiceError as e:
109 1
            status = ua.StatusCode(e.code)
110 1
            response = ua.ServiceFault()
111 1
            response.ResponseHeader.ServiceResult = status
112 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
113 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
114 1
            return True
115
116 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
117 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
118 1
            self.logger.info("Create session request")
119 1
            params = ua.CreateSessionParameters.from_binary(body)
120
121
            # create the session on server
122 1
            self.session = self.iserver.create_session(self.name, external=True)
123
            # get a session creation result to send back
124 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)
125
126 1
            response = ua.CreateSessionResponse()
127 1
            response.Parameters = sessiondata
128 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
129 1
            if self._connection._security_policy.server_certificate is None:
130 1
                data = params.ClientNonce
131
            else:
132 1
                data = self._connection._security_policy.server_certificate + params.ClientNonce
133 1
            response.Parameters.ServerSignature.Signature = \
134
                self._connection._security_policy.asymmetric_cryptography.signature(data)
135
136 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
137
138 1
            self.logger.info("sending create sesssion response")
139 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
140
141 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
142 1
            self.logger.info("Close session request")
143 1
            deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
144
145 1
            self.session.close_session(deletesubs)
146
147 1
            response = ua.CloseSessionResponse()
148 1
            self.logger.info("sending close sesssion response")
149 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
150
151 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
152 1
            self.logger.info("Activate session request")
153 1
            params = ua.ActivateSessionParameters.from_binary(body)
154
155 1
            if not self.session:
156
                self.logger.info("request to activate non-existing session")
157
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
158
159 1
            if self._connection._security_policy.client_certificate is None:
160 1
                data = self.session.nonce
161
            else:
162 1
                data = self._connection._security_policy.client_certificate + self.session.nonce
163 1
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
164
165 1
            result = self.session.activate_session(params)
166
167 1
            response = ua.ActivateSessionResponse()
168 1
            response.Parameters = result
169
170 1
            self.logger.info("sending read response")
171 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
172
173 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
174 1
            self.logger.info("Read request")
175 1
            params = ua.ReadParameters.from_binary(body)
176
177 1
            results = self.session.read(params)
178
179 1
            response = ua.ReadResponse()
180 1
            response.Results = results
181
182 1
            self.logger.info("sending read response")
183 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
184
185 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
186 1
            self.logger.info("Write request")
187 1
            params = ua.WriteParameters.from_binary(body)
188
189 1
            results = self.session.write(params)
190
191 1
            response = ua.WriteResponse()
192 1
            response.Results = results
193
194 1
            self.logger.info("sending write response")
195 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
196
197 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
198 1
            self.logger.info("Browse request")
199 1
            params = ua.BrowseParameters.from_binary(body)
200
201 1
            results = self.session.browse(params)
202
203 1
            response = ua.BrowseResponse()
204 1
            response.Results = results
205
206 1
            self.logger.info("sending browse response")
207 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
208
209 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
210 1
            self.logger.info("get endpoints request")
211 1
            params = ua.GetEndpointsParameters.from_binary(body)
212
213 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
214
215 1
            response = ua.GetEndpointsResponse()
216 1
            response.Endpoints = endpoints
217
218 1
            self.logger.info("sending get endpoints response")
219 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
220
221 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
222 1
            self.logger.info("find servers request")
223 1
            params = ua.FindServersParameters.from_binary(body)
224
225 1
            servers = self.iserver.find_servers(params)
226
227 1
            response = ua.FindServersResponse()
228 1
            response.Servers = servers
229
230 1
            self.logger.info("sending find servers response")
231 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
232
233 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
234 1
            self.logger.info("register server request")
235 1
            serv = ua.RegisteredServer.from_binary(body)
236
237 1
            self.iserver.register_server(serv)
238
239 1
            response = ua.RegisterServerResponse()
240
241 1
            self.logger.info("sending register server response")
242 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
243
244 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
245
            self.logger.info("register server 2 request")
246
            params = ua.RegisterServer2Parameters.from_binary(body)
247
248
            results = self.iserver.register_server2(params)
249
250
            response = ua.RegisterServer2Response()
251
            response.ConfigurationResults = results
252
253
            self.logger.info("sending register server 2 response")
254
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
255
256 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
257 1
            self.logger.info("translate browsepaths to nodeids request")
258 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
259
260 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
261
262 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
263 1
            response.Results = paths
264
265 1
            self.logger.info("sending translate browsepaths to nodeids response")
266 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
267
268 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
269 1
            self.logger.info("add nodes request")
270 1
            params = ua.AddNodesParameters.from_binary(body)
271
272 1
            results = self.session.add_nodes(params.NodesToAdd)
273
274 1
            response = ua.AddNodesResponse()
275 1
            response.Results = results
276
277 1
            self.logger.info("sending add node response")
278 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
279
280 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
281 1
            self.logger.info("delete nodes request")
282 1
            params = ua.DeleteNodesParameters.from_binary(body)
283
284 1
            results = self.session.delete_nodes(params)
285
286 1
            response = ua.DeleteNodesResponse()
287 1
            response.Results = results
288
289 1
            self.logger.info("sending delete node response")
290 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
291
292 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
293 1
            self.logger.info("add references request")
294 1
            params = ua.AddReferencesParameters.from_binary(body)
295
296 1
            results = self.session.add_references(params.ReferencesToAdd)
297
298 1
            response = ua.AddReferencesResponse()
299 1
            response.Results = results
300
301 1
            self.logger.info("sending add references response")
302 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
303
304 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
305
            self.logger.info("delete references request")
306
            params = ua.DeleteReferencesParameters.from_binary(body)
307
308
            results = self.session.delete_references(params.ReferencesToDelete)
309
310
            response = ua.DeleteReferencesResponse()
311
            response.Parameters.Results = results
312
313
            self.logger.info("sending delete references response")
314
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
315
316
317 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
318 1
            self.logger.info("create subscription request")
319 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
320
321 1
            result = self.session.create_subscription(params, self.forward_publish_response)
322
323 1
            response = ua.CreateSubscriptionResponse()
324 1
            response.Parameters = result
325
326 1
            self.logger.info("sending create subscription response")
327 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
328
329 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
330 1
            self.logger.info("delete subscriptions request")
331 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
332
333 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
334
335 1
            response = ua.DeleteSubscriptionsResponse()
336 1
            response.Results = results
337
338 1
            self.logger.info("sending delte subscription response")
339 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
340
341 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
342 1
            self.logger.info("create monitored items request")
343 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
344 1
            results = self.session.create_monitored_items(params)
345
346 1
            response = ua.CreateMonitoredItemsResponse()
347 1
            response.Results = results
348
349 1
            self.logger.info("sending create monitored items response")
350 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
351
352 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
353
            self.logger.info("modify monitored items request")
354
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
355
            results = self.session.modify_monitored_items(params)
356
357
            response = ua.ModifyMonitoredItemsResponse()
358
            response.Results = results
359
360
            self.logger.info("sending modify monitored items response")
361
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
362
363 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
364 1
            self.logger.info("delete monitored items request")
365 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
366
367 1
            results = self.session.delete_monitored_items(params)
368
369 1
            response = ua.DeleteMonitoredItemsResponse()
370 1
            response.Results = results
371
372 1
            self.logger.info("sending delete monitored items response")
373 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
374
375 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
376
            self.logger.info("history read request")
377
            params = ua.HistoryReadParameters.from_binary(body)
378
379
            results = self.session.history_read(params)
380
381
            response = ua.HistoryReadResponse()
382
            response.Results = results
383
384
            self.logger.info("sending history read response")
385
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
386
387 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
388
            self.logger.info("register nodes request")
389
            params = ua.RegisterNodesParameters.from_binary(body)
390
            self.logger.info("Node registration not implemented")
391
392
            response = ua.RegisterNodesResponse()
393
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
394
395
            self.logger.info("sending register nodes response")
396
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
397
398 1
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
399
            self.logger.info("unregister nodes request")
400
            params = ua.UnregisterNodesParameters.from_binary(body)
401
402
            response = ua.UnregisterNodesResponse()
403
404
            self.logger.info("sending unregister nodes response")
405
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
406
407 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
408 1
            self.logger.info("publish request")
409
410 1
            if not self.session:
411
                return False
412
413 1
            params = ua.PublishParameters.from_binary(body)
414
415 1
            data = PublishRequestData()
416 1
            data.requesthdr = requesthdr
417 1
            data.seqhdr = seqhdr
418 1
            data.algohdr = algohdr
419 1
            with self._datalock:
420 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
421 1
                if self._publish_result_queue:
422
                    result = self._publish_result_queue.pop(0)
423
                    self.forward_publish_response(result)
424 1
            self.session.publish(params.SubscriptionAcknowledgements)
425 1
            self.logger.info("publish forward to server")
426
427 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
428
            self.logger.info("re-publish request")
429
430
            params = ua.RepublishParameters.from_binary(body)
431
            msg = self.session.republish(params)
432
433
            response = ua.RepublishResponse()
434
            response.NotificationMessage = msg
435
436
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
437
438 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
439
            self.logger.info("close secure channel request")
440
            self._connection.close()
441
            response = ua.CloseSecureChannelResponse()
442
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
443
            return False
444
445 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
446 1
            self.logger.info("call request")
447
448 1
            params = ua.CallParameters.from_binary(body)
449
450 1
            results = self.session.call(params.MethodsToCall)
451
452 1
            response = ua.CallResponse()
453 1
            response.Results = results
454
455 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
456
457
        else:
458 1
            self.logger.warning("Unknown message received %s", typeid)
459 1
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
460
461 1
        return True
462
463 1
    def close(self):
464
        """
465
        to be called when client has disconnected to ensure we really close
466
        everything we should
467
        """
468 1
        print("Cleanup client connection: ", self.name)
469 1
        if self.session:
470
            self.session.close_session(True)
471