Completed
Pull Request — master (#70)
by Olivier
03:03
created

opcua.UAProcessor._process_message()   F

Complexity

Conditions 25

Size

Total Lines 257

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 145
CRAP Score 28.1471
Metric Value
dl 0
loc 257
ccs 145
cts 175
cp 0.8286
rs 2
cc 25
crap 28.1471

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like opcua.UAProcessor._process_message() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
2 1
import logging
3 1
from threading import Lock
4 1
from datetime import datetime
5
6 1
from opcua import ua
7 1
from opcua import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16
17
18 1
class UAProcessor(object):
19
20 1
    def __init__(self, internal_server, socket):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.iserver = internal_server
23 1
        self.name = socket.get_extra_info('peername')
24 1
        self.sockname = socket.get_extra_info('sockname')
25 1
        self.session = None
26 1
        self.channel = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = Lock()
30 1
        self._publishdata_queue = []
31 1
        self._seq_number = 1
32
33 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
34 1
        with self._socketlock:
35 1
            response.ResponseHeader.RequestHandle = requesthandle
36 1
            seqhdr.SequenceNumber = self._seq_number
37 1
            self._seq_number += 1
38 1
            hdr = ua.Header(msgtype, ua.ChunkType.Single, self.channel.SecurityToken.ChannelId)
39 1
            if isinstance(algohdr, ua.SymmetricAlgorithmHeader):
40 1
                algohdr.TokenId = self.channel.SecurityToken.TokenId
41 1
            self._write_socket(hdr, algohdr, seqhdr, response)
42
43 1
    def _write_socket(self, hdr, *args):
44 1
        alle = []
45 1
        for arg in args:
46 1
            data = arg.to_binary()
47 1
            hdr.add_size(len(data))
48 1
            alle.append(data)
49 1
        alle.insert(0, hdr.to_binary())
50 1
        alle = b"".join(alle)
51 1
        self.logger.info("writting %s bytes to socket, with header %s ", len(alle), hdr)
52
        #self.logger.info("writting data %s", hdr, [i for i in args])
53
        #self.logger.debug("data: %s", alle)
54 1
        self.socket.write(alle)
55
56 1
    def open_secure_channel(self, body):
57 1
        algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
58 1
        seqhdr = ua.SequenceHeader.from_binary(body)
59 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
60
61 1
        channel = self._open_secure_channel(request.Parameters)
62
        # send response
63 1
        response = ua.OpenSecureChannelResponse()
64 1
        response.Parameters = channel
65 1
        self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
66
67 1
    def forward_publish_response(self, result):
68 1
        self.logger.info("forward publish response %s", result)
69 1
        with self._datalock:
70 1
            if len(self._publishdata_queue) == 0:
71
                self.logger.warning("Error server wants to send publish answer but no publish request is available")
72
                return
73 1
            requestdata = self._publishdata_queue.pop(0)
74 1
        response = ua.PublishResponse()
75 1
        response.Parameters = result
76
77 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
78
79 1
    def process(self, header, body):
80 1
        if header.MessageType == ua.MessageType.Hello:
81 1
            hello = ua.Hello.from_binary(body)
82 1
            hdr = ua.Header(ua.MessageType.Acknowledge, ua.ChunkType.Single)
83 1
            ack = ua.Acknowledge()
84 1
            ack.ReceiveBufferSize = hello.ReceiveBufferSize
85 1
            ack.SendBufferSize = hello.SendBufferSize
86 1
            self._write_socket(hdr, ack)
87
88 1
        elif header.MessageType == ua.MessageType.Error:
89
            self.logger.warning("Received an error message type")
90
91 1
        elif header.MessageType == ua.MessageType.SecureOpen:
92 1
            self.open_secure_channel(body)
93
94 1
        elif header.MessageType == ua.MessageType.SecureClose:
95 1
            if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
96
                self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
97 1
            return False
98
99 1
        elif header.MessageType == ua.MessageType.SecureMessage:
100 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
101 1
            seqhdr = ua.SequenceHeader.from_binary(body)
102 1
            return self.process_message(algohdr, seqhdr, body)
103
104
        else:
105
            self.logger.warning("Unsupported message type: %s", header.MessageType)
106 1
        return True
107
108 1
    def process_message(self, algohdr, seqhdr, body):
109 1
        typeid = ua.NodeId.from_binary(body)
110 1
        requesthdr = ua.RequestHeader.from_binary(body)
111 1
        try:
112 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
113
        except utils.ServiceError as e:
114
            status = ua.StatusCode(e.code)
115
            response = ua.ServiceFault()
116
            response.ResponseHeader.ServiceResult = status
117
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
118
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
119
            return True
120
121 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
122 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
123 1
            self.logger.info("Create session request")
124 1
            params = ua.CreateSessionParameters.from_binary(body)
125
126 1
            self.session = self.iserver.create_session(self.name)  # create the session on server
127 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
128
129 1
            response = ua.CreateSessionResponse()
130 1
            response.Parameters = sessiondata
131
132 1
            self.logger.info("sending create sesssion response")
133 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
134
135 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
136 1
            self.logger.info("Close session request")
137 1
            deletesubs = ua.unpack_uatype('Boolean', body)
138
139 1
            self.session.close_session(deletesubs)
140
141 1
            response = ua.CloseSessionResponse()
142 1
            self.logger.info("sending close sesssion response")
143 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
144
145 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
146 1
            self.logger.info("Activate session request")
147 1
            params = ua.ActivateSessionParameters.from_binary(body)
148
149 1
            if not self.session:
150
                self.logger.info("request to activate non-existing session")
151
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
152
153 1
            result = self.session.activate_session(params)
154
155 1
            response = ua.ActivateSessionResponse()
156 1
            response.Parameters = result
157
158 1
            self.logger.info("sending read response")
159 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
160
161 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
162 1
            self.logger.info("Read request")
163 1
            params = ua.ReadParameters.from_binary(body)
164
165 1
            results = self.session.read(params)
166
167 1
            response = ua.ReadResponse()
168 1
            response.Results = results
169
170 1
            self.logger.info("sending read response")
171 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
172
173 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
174 1
            self.logger.info("Write request")
175 1
            params = ua.WriteParameters.from_binary(body)
176
177 1
            results = self.session.write(params)
178
179 1
            response = ua.WriteResponse()
180 1
            response.Results = results
181
182 1
            self.logger.info("sending write response")
183 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
184
185 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
186 1
            self.logger.info("Browse request")
187 1
            params = ua.BrowseParameters.from_binary(body)
188
189 1
            results = self.session.browse(params)
190
191 1
            response = ua.BrowseResponse()
192 1
            response.Results = results
193
194 1
            self.logger.info("sending browse response")
195 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
196
197 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
198 1
            self.logger.info("get endpoints request")
199 1
            params = ua.GetEndpointsParameters.from_binary(body)
200
201 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
202
203 1
            response = ua.GetEndpointsResponse()
204 1
            response.Endpoints = endpoints
205
206 1
            self.logger.info("sending get endpoints response")
207 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
208
209 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
210 1
            self.logger.info("find servers request")
211 1
            params = ua.FindServersParameters.from_binary(body)
212
213 1
            servers = self.iserver.find_servers(params)
214
215 1
            response = ua.FindServersResponse()
216 1
            response.Servers = servers
217
218 1
            self.logger.info("sending find servers response")
219 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
220
221 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
222 1
            self.logger.info("register server request")
223 1
            serv = ua.RegisteredServer.from_binary(body)
224
225 1
            self.iserver.register_server(serv)
226
227 1
            response = ua.RegisterServerResponse()
228
229 1
            self.logger.info("sending register server response")
230 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
231
232 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
233
            self.logger.info("register server 2 request")
234
            params = ua.RegisterServer2Parameters.from_binary(body)
235
236
            results = self.iserver.register_server2(params)
237
238
            response = ua.RegisterServer2Response()
239
            response.ConfigurationResults = results
240
241
            self.logger.info("sending register server 2 response")
242
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
243
244 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
245 1
            self.logger.info("translate browsepaths to nodeids request")
246 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
247
248 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
249
250 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
251 1
            response.Results = paths
252
253 1
            self.logger.info("sending translate browsepaths to nodeids response")
254 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
255
256 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
257 1
            self.logger.info("add nodes request")
258 1
            params = ua.AddNodesParameters.from_binary(body)
259
260 1
            results = self.session.add_nodes(params.NodesToAdd)
261
262 1
            response = ua.AddNodesResponse()
263 1
            response.Results = results
264
265 1
            self.logger.info("sending add node response")
266 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
267
268 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
269 1
            self.logger.info("create subscription request")
270 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
271
272 1
            result = self.session.create_subscription(params, self.forward_publish_response)
273
274 1
            response = ua.CreateSubscriptionResponse()
275 1
            response.Parameters = result
276
277 1
            self.logger.info("sending create subscription response")
278 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
279
280 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
281 1
            self.logger.info("delete subscriptions request")
282 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
283
284 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
285
286 1
            response = ua.DeleteSubscriptionsResponse()
287 1
            response.Results = results
288
289 1
            self.logger.info("sending delte subscription response")
290 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
291
292 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
293 1
            self.logger.info("create monitored items request")
294 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
295 1
            results = self.session.create_monitored_items(params)
296
297 1
            response = ua.CreateMonitoredItemsResponse()
298 1
            response.Results = results
299
300 1
            self.logger.info("sending create monitored items response")
301 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
302
303 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
304
            self.logger.info("modify monitored items request")
305
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
306
            results = self.session.modify_monitored_items(params)
307
308
            response = ua.ModifyMonitoredItemsResponse()
309
            response.Results = results
310
311
            self.logger.info("sending modify monitored items response")
312
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
313
314 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
315 1
            self.logger.info("delete monitored items request")
316 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
317
318 1
            results = self.session.delete_monitored_items(params)
319
320 1
            response = ua.DeleteMonitoredItemsResponse()
321 1
            response.Results = results
322
323 1
            self.logger.info("sending delete monitored items response")
324 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
325
326 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
327 1
            self.logger.info("publish request")
328
329 1
            if not self.session:
330
                return False
331
332 1
            params = ua.PublishParameters.from_binary(body)
333
334 1
            data = PublishRequestData()
335 1
            data.requesthdr = requesthdr
336 1
            data.seqhdr = seqhdr
337 1
            data.algohdr = algohdr
338 1
            with self._datalock:
339 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
340 1
            self.session.publish(params.SubscriptionAcknowledgements)
341 1
            self.logger.info("publish forward to server")
342
343 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
344
            self.logger.info("re-publish request")
345
346
            params = ua.RepublishParameters.from_binary(body)
347
            msg = self.session.republish(params)
348
349
            response = ua.RepublishResponse()
350
            response.NotificationMessage = msg
351
352
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
353
354 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
355
            self.logger.info("close secure channel request")
356
            response = ua.CloseSecureChannelResponse()
357
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
358
            self.channel = None
359
            return False
360
361 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
362 1
            self.logger.info("call request")
363
364 1
            params = ua.CallParameters.from_binary(body)
365
366 1
            results = self.session.call(params.MethodsToCall)
367
368 1
            response = ua.CallResponse()
369 1
            response.Results = results
370
371 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
372
373
        else:
374
            self.logger.warning("Unknown message received %s", typeid)
375
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
376
377 1
        return True
378
379 1
    def _open_secure_channel(self, params):
380 1
        self.logger.info("open secure channel")
381 1
        if not self.channel or params.RequestType == ua.SecurityTokenRequestType.Issue:
382 1
            self.channel = ua.OpenSecureChannelResult()
383 1
            self.channel.SecurityToken.TokenId = 13  # random value
384 1
            self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
385 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
386 1
        self.channel.SecurityToken.TokenId += 1
387 1
        self.channel.SecurityToken.CreatedAt = datetime.now()
388 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
389 1
        self.channel.ServerNonce = utils.create_nonce()
390 1
        return self.channel
391
392 1
    def close(self):
393
        """
394
        to be called when client has disconnected to ensure we really close
395
        everything we should
396
        """
397 1
        print("Cleanup client connection: ", self.name)
398 1
        if self.session:
399
            self.session.close_session(True)
400