Completed
Pull Request — master (#396)
by
unknown
04:44
created

UaProcessor   C

Complexity

Total Complexity 55

Size/Duplication

Total Lines 438
Duplicated Lines 0 %

Test Coverage

Coverage 81.31%

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 438
ccs 248
cts 305
cp 0.8131
rs 6
wmc 55

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 12 1
A set_policies() 0 2 1
B forward_publish_response() 0 18 5
A send_response() 0 7 2
A open_secure_channel() 0 11 1
A close() 0 8 2
F _process_message() 0 333 33
A process_message() 0 12 2
D process() 0 25 8

How to fix   Complexity   

Complex Class

Complex classes like UaProcessor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
import time
5
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16 1
        self.timestamp = time.time()
17
18
19 1
class UaProcessor(object):
20
21 1
    def __init__(self, internal_server, socket):
22 1
        self.logger = logging.getLogger(__name__)
23 1
        self.iserver = internal_server
24 1
        self.name = socket.get_extra_info('peername')
25 1
        self.sockname = socket.get_extra_info('sockname')
26 1
        self.session = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = RLock()
30 1
        self._publishdata_queue = []
31 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
32 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
33
34 1
    def set_policies(self, policies):
35 1
        self._connection.set_policy_factories(policies)
36
37 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
38 1
        with self._socketlock:
39 1
            response.ResponseHeader.RequestHandle = requesthandle
40 1
            data = self._connection.message_to_binary(
41
                response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
42
43 1
            self.socket.write(data)
44
45 1
    def open_secure_channel(self, algohdr, seqhdr, body):
46 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
47
48 1
        self._connection.select_policy(
49
            algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
50
51 1
        channel = self._connection.open(request.Parameters, self.iserver)
52
        # send response
53 1
        response = ua.OpenSecureChannelResponse()
54 1
        response.Parameters = channel
55 1
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
56
57 1
    def forward_publish_response(self, result):
58 1
        self.logger.info("forward publish response %s", result)
59 1
        with self._datalock:
60 1
            while True:
61 1
                if len(self._publishdata_queue) == 0:
62
                    self._publish_result_queue.append(result)
63
                    self.logger.info("Server wants to send publish answer but no publish request is available,"
64
                                     "enqueing notification, length of result queue is %s",
65
                                     len(self._publish_result_queue))
66
                    return
67 1
                requestdata = self._publishdata_queue.pop(0)
68 1
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
69 1
                    break
70
71 1
        response = ua.PublishResponse()
72 1
        response.Parameters = result
73
74 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
75
76 1
    def process(self, header, body):
77 1
        msg = self._connection.receive_from_header_and_body(header, body)
78 1
        if isinstance(msg, ua.Message):
79 1
            if header.MessageType == ua.MessageType.SecureOpen:
80 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
81
82 1
            elif header.MessageType == ua.MessageType.SecureClose:
83 1
                self._connection.close()
84 1
                return False
85
86 1
            elif header.MessageType == ua.MessageType.SecureMessage:
87 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
88 1
        elif isinstance(msg, ua.Hello):
89 1
            ack = ua.Acknowledge()
90 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
91 1
            ack.SendBufferSize = msg.SendBufferSize
92 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
93 1
            self.socket.write(data)
94
95
        elif isinstance(msg, ua.ErrorMessage):
96
            self.logger.warning("Received an error message type")
97
        elif msg is not None:  # ua.MessageType.SecureMessage and ChunkType.Intermediate
98
            self.logger.warning("Unsupported message type: %s", header.MessageType)
99
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
100 1
        return True
101
102 1
    def process_message(self, algohdr, seqhdr, body):
103 1
        typeid = ua.NodeId.from_binary(body)
104 1
        requesthdr = ua.RequestHeader.from_binary(body)
105 1
        try:
106 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
107 1
        except utils.ServiceError as e:
108 1
            status = ua.StatusCode(e.code)
109 1
            response = ua.ServiceFault()
110 1
            response.ResponseHeader.ServiceResult = status
111 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
112 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
113 1
            return True
114
115 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
116 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
117 1
            self.logger.info("Create session request")
118 1
            params = ua.CreateSessionParameters.from_binary(body)
119
120
            # create the session on server
121 1
            self.session = self.iserver.create_session(self.name, external=True)
122
            # get a session creation result to send back
123 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)
124
125 1
            response = ua.CreateSessionResponse()
126 1
            response.Parameters = sessiondata
127 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
128 1
            if self._connection._security_policy.server_certificate is None:
129 1
                data = params.ClientNonce
130
            else:
131 1
                data = self._connection._security_policy.server_certificate + params.ClientNonce
132 1
            response.Parameters.ServerSignature.Signature = \
133
                self._connection._security_policy.asymmetric_cryptography.signature(data)
134
135 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
136
137 1
            self.logger.info("sending create sesssion response")
138 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
139
140 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
141 1
            self.logger.info("Close session request")
142 1
            deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
143
144 1
            self.session.close_session(deletesubs)
145
146 1
            response = ua.CloseSessionResponse()
147 1
            self.logger.info("sending close sesssion response")
148 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
149
150 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
151 1
            self.logger.info("Activate session request")
152 1
            params = ua.ActivateSessionParameters.from_binary(body)
153
154 1
            if not self.session:
155
                self.logger.info("request to activate non-existing session")
156
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
157
158 1
            if self._connection._security_policy.client_certificate is None:
159 1
                data = self.session.nonce
160
            else:
161 1
                data = self._connection._security_policy.client_certificate + self.session.nonce
162 1
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
163
164 1
            result = self.session.activate_session(params)
165
166 1
            response = ua.ActivateSessionResponse()
167 1
            response.Parameters = result
168
169 1
            self.logger.info("sending read response")
170 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
171
172 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
173 1
            self.logger.info("Read request")
174 1
            params = ua.ReadParameters.from_binary(body)
175
176 1
            results = self.session.read(params)
177
178 1
            response = ua.ReadResponse()
179 1
            response.Results = results
180
181 1
            self.logger.info("sending read response")
182 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
183
184 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
185 1
            self.logger.info("Write request")
186 1
            params = ua.WriteParameters.from_binary(body)
187
188 1
            results = self.session.write(params)
189
190 1
            response = ua.WriteResponse()
191 1
            response.Results = results
192
193 1
            self.logger.info("sending write response")
194 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
195
196 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
197 1
            self.logger.info("Browse request")
198 1
            params = ua.BrowseParameters.from_binary(body)
199
200 1
            results = self.session.browse(params)
201
202 1
            response = ua.BrowseResponse()
203 1
            response.Results = results
204
205 1
            self.logger.info("sending browse response")
206 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
207
208 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
209 1
            self.logger.info("get endpoints request")
210 1
            params = ua.GetEndpointsParameters.from_binary(body)
211
212 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
213
214 1
            response = ua.GetEndpointsResponse()
215 1
            response.Endpoints = endpoints
216
217 1
            self.logger.info("sending get endpoints response")
218 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
219
220 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
221 1
            self.logger.info("find servers request")
222 1
            params = ua.FindServersParameters.from_binary(body)
223
224 1
            servers = self.iserver.find_servers(params)
225
226 1
            response = ua.FindServersResponse()
227 1
            response.Servers = servers
228
229 1
            self.logger.info("sending find servers response")
230 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
231
232 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
233 1
            self.logger.info("register server request")
234 1
            serv = ua.RegisteredServer.from_binary(body)
235
236 1
            self.iserver.register_server(serv)
237
238 1
            response = ua.RegisterServerResponse()
239
240 1
            self.logger.info("sending register server response")
241 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
242
243 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
244
            self.logger.info("register server 2 request")
245
            params = ua.RegisterServer2Parameters.from_binary(body)
246
247
            results = self.iserver.register_server2(params)
248
249
            response = ua.RegisterServer2Response()
250
            response.ConfigurationResults = results
251
252
            self.logger.info("sending register server 2 response")
253
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
254
255 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
256 1
            self.logger.info("translate browsepaths to nodeids request")
257 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
258
259 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
260
261 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
262 1
            response.Results = paths
263
264 1
            self.logger.info("sending translate browsepaths to nodeids response")
265 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
266
267 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
268 1
            self.logger.info("add nodes request")
269 1
            params = ua.AddNodesParameters.from_binary(body)
270
271 1
            results = self.session.add_nodes(params.NodesToAdd)
272
273 1
            response = ua.AddNodesResponse()
274 1
            response.Results = results
275
276 1
            self.logger.info("sending add node response")
277 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
278
279 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
280 1
            self.logger.info("delete nodes request")
281 1
            params = ua.DeleteNodesParameters.from_binary(body)
282
283 1
            results = self.session.delete_nodes(params)
284
285 1
            response = ua.DeleteNodesResponse()
286 1
            response.Results = results
287
288 1
            self.logger.info("sending delete node response")
289 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
290
291 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
292 1
            self.logger.info("add references request")
293 1
            params = ua.AddReferencesParameters.from_binary(body)
294
295 1
            results = self.session.add_references(params.ReferencesToAdd)
296
297 1
            response = ua.AddReferencesResponse()
298 1
            response.Results = results
299
300 1
            self.logger.info("sending add references response")
301 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
302
303 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
304 1
            self.logger.info("create subscription request")
305 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
306
307 1
            result = self.session.create_subscription(params, self.forward_publish_response)
308
309 1
            response = ua.CreateSubscriptionResponse()
310 1
            response.Parameters = result
311
312 1
            self.logger.info("sending create subscription response")
313 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
314
315 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
316 1
            self.logger.info("delete subscriptions request")
317 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
318
319 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
320
321 1
            response = ua.DeleteSubscriptionsResponse()
322 1
            response.Results = results
323
324 1
            self.logger.info("sending delte subscription response")
325 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
326
327 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
328 1
            self.logger.info("create monitored items request")
329 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
330 1
            results = self.session.create_monitored_items(params)
331
332 1
            response = ua.CreateMonitoredItemsResponse()
333 1
            response.Results = results
334
335 1
            self.logger.info("sending create monitored items response")
336 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
337
338 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
339
            self.logger.info("modify monitored items request")
340
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
341
            results = self.session.modify_monitored_items(params)
342
343
            response = ua.ModifyMonitoredItemsResponse()
344
            response.Results = results
345
346
            self.logger.info("sending modify monitored items response")
347
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
348
349 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
350 1
            self.logger.info("delete monitored items request")
351 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
352
353 1
            results = self.session.delete_monitored_items(params)
354
355 1
            response = ua.DeleteMonitoredItemsResponse()
356 1
            response.Results = results
357
358 1
            self.logger.info("sending delete monitored items response")
359 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
360
361 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
362
            self.logger.info("history read request")
363
            params = ua.HistoryReadParameters.from_binary(body)
364
365
            results = self.session.history_read(params)
366
367
            response = ua.HistoryReadResponse()
368
            response.Results = results
369
370
            self.logger.info("sending history read response")
371
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
372
373 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
374
            self.logger.info("register nodes request")
375
            params = ua.RegisterNodesParameters.from_binary(body)
376
            self.logger.info("Node registration not implemented")
377
378
            response = ua.RegisterNodesResponse()
379
            response.Parameters.RegisteredNodeIds = params.NodesToRegister
380
381
            self.logger.info("sending register nodes response")
382
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
383
384 1
        elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
385
            self.logger.info("unregister nodes request")
386
            params = ua.UnregisterNodesParameters.from_binary(body)
387
388
            response = ua.UnregisterNodesResponse()
389
390
            self.logger.info("sending unregister nodes response")
391
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
392
393 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
394 1
            self.logger.info("publish request")
395
396 1
            if not self.session:
397
                return False
398
399 1
            params = ua.PublishParameters.from_binary(body)
400
401 1
            data = PublishRequestData()
402 1
            data.requesthdr = requesthdr
403 1
            data.seqhdr = seqhdr
404 1
            data.algohdr = algohdr
405 1
            with self._datalock:
406 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
407 1
                if self._publish_result_queue:
408
                    result = self._publish_result_queue.pop(0)
409
                    self.forward_publish_response(result)
410 1
            self.session.publish(params.SubscriptionAcknowledgements)
411 1
            self.logger.info("publish forward to server")
412
413 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
414
            self.logger.info("re-publish request")
415
416
            params = ua.RepublishParameters.from_binary(body)
417
            msg = self.session.republish(params)
418
419
            response = ua.RepublishResponse()
420
            response.NotificationMessage = msg
421
422
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
423
424 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
425
            self.logger.info("close secure channel request")
426
            self._connection.close()
427
            response = ua.CloseSecureChannelResponse()
428
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
429
            return False
430
431 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
432 1
            self.logger.info("call request")
433
434 1
            params = ua.CallParameters.from_binary(body)
435
436 1
            results = self.session.call(params.MethodsToCall)
437
438 1
            response = ua.CallResponse()
439 1
            response.Results = results
440
441 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
442
443
        else:
444 1
            self.logger.warning("Unknown message received %s", typeid)
445 1
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
446
447 1
        return True
448
449 1
    def close(self):
450
        """
451
        to be called when client has disconnected to ensure we really close
452
        everything we should
453
        """
454 1
        print("Cleanup client connection: ", self.name)
455 1
        if self.session:
456
            self.session.close_session(True)
457