Completed
Push — master ( 5a9df5...39e3a6 )
by Olivier
223:12 queued 212:11
created

opcua.server.UAProcessor.process()   D

Complexity

Conditions 8

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 8.1419
Metric Value
cc 8
dl 0
loc 29
ccs 20
cts 23
cp 0.8696
crap 8.1419
rs 4
1
2 1
import logging
3 1
from threading import Lock
4 1
from datetime import datetime
5
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16
17
18 1
class UAProcessor(object):
19
20 1
    def __init__(self, internal_server, socket):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.iserver = internal_server
23 1
        self.name = socket.get_extra_info('peername')
24 1
        self.sockname = socket.get_extra_info('sockname')
25 1
        self.session = None
26 1
        self.channel = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = Lock()
30 1
        self._publishdata_queue = []
31 1
        self._seq_number = 0
32 1
        self._security_policy = ua.SecurityPolicy()
33 1
        self._max_chunk_size = 65536
34
35 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
36 1
        with self._socketlock:
37 1
            response.ResponseHeader.RequestHandle = requesthandle
38 1
            for chunk in ua.MessageChunk.message_to_chunks(self._security_policy, response.to_binary(), self._max_chunk_size, msgtype,
39
                    channel_id=self.channel.SecurityToken.ChannelId,
40
                    token_id=self.channel.SecurityToken.TokenId,
41
                    request_id=seqhdr.RequestId):
42 1
                self._seq_number += 1
43 1
                chunk.SequenceHeader.SequenceNumber = self._seq_number
44 1
                self.socket.write(chunk.to_binary())
45
46 1
    def _write_socket(self, hdr, *args):
47 1
        alle = []
48 1
        for arg in args:
49 1
            data = arg.to_binary()
50 1
            hdr.add_size(len(data))
51 1
            alle.append(data)
52 1
        alle.insert(0, hdr.to_binary())
53 1
        alle = b"".join(alle)
54 1
        self.logger.info("writting %s bytes to socket, with header %s ", len(alle), hdr)
55
        #self.logger.info("writting data %s", hdr, [i for i in args])
56
        #self.logger.debug("data: %s", alle)
57 1
        self.socket.write(alle)
58
59 1
    def open_secure_channel(self, body):
60 1
        algohdr = ua.AsymmetricAlgorithmHeader.from_binary(body)
61 1
        seqhdr = ua.SequenceHeader.from_binary(body)
62 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
63
64 1
        channel = self._open_secure_channel(request.Parameters)
65
        # send response
66 1
        response = ua.OpenSecureChannelResponse()
67 1
        response.Parameters = channel
68 1
        self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
69
70 1
    def forward_publish_response(self, result):
71 1
        self.logger.info("forward publish response %s", result)
72 1
        with self._datalock:
73 1
            if len(self._publishdata_queue) == 0:
74
                self.logger.warning("Error server wants to send publish answer but no publish request is available")
75
                return
76 1
            requestdata = self._publishdata_queue.pop(0)
77 1
        response = ua.PublishResponse()
78 1
        response.Parameters = result
79
80 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
81
82 1
    def process(self, header, body):
83 1
        if header.MessageType == ua.MessageType.Hello:
84 1
            hello = ua.Hello.from_binary(body)
85 1
            hdr = ua.Header(ua.MessageType.Acknowledge, ua.ChunkType.Single)
86 1
            ack = ua.Acknowledge()
87 1
            self._max_chunk_size = hello.ReceiveBufferSize
88 1
            ack.ReceiveBufferSize = hello.ReceiveBufferSize
89 1
            ack.SendBufferSize = hello.SendBufferSize
90 1
            self._write_socket(hdr, ack)
91
92 1
        elif header.MessageType == ua.MessageType.Error:
93
            self.logger.warning("Received an error message type")
94
95 1
        elif header.MessageType == ua.MessageType.SecureOpen:
96 1
            self.open_secure_channel(body)
97
98 1
        elif header.MessageType == ua.MessageType.SecureClose:
99 1
            if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
100
                self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
101 1
            return False
102
103 1
        elif header.MessageType == ua.MessageType.SecureMessage:
104 1
            algohdr = ua.SymmetricAlgorithmHeader.from_binary(body)
105 1
            seqhdr = ua.SequenceHeader.from_binary(body)
106 1
            return self.process_message(algohdr, seqhdr, body)
107
108
        else:
109
            self.logger.warning("Unsupported message type: %s", header.MessageType)
110 1
        return True
111
112 1
    def process_message(self, algohdr, seqhdr, body):
113 1
        typeid = ua.NodeId.from_binary(body)
114 1
        requesthdr = ua.RequestHeader.from_binary(body)
115 1
        try:
116 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
117
        except utils.ServiceError as e:
118
            status = ua.StatusCode(e.code)
119
            response = ua.ServiceFault()
120
            response.ResponseHeader.ServiceResult = status
121
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
122
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
123
            return True
124
125 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
126 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
127 1
            self.logger.info("Create session request")
128 1
            params = ua.CreateSessionParameters.from_binary(body)
129
130 1
            self.session = self.iserver.create_session(self.name, external=True)  # create the session on server
131 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
132
133 1
            response = ua.CreateSessionResponse()
134 1
            response.Parameters = sessiondata
135
136 1
            self.logger.info("sending create sesssion response")
137 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
138
139 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
140 1
            self.logger.info("Close session request")
141 1
            deletesubs = ua.unpack_uatype('Boolean', body)
142
143 1
            self.session.close_session(deletesubs)
144
145 1
            response = ua.CloseSessionResponse()
146 1
            self.logger.info("sending close sesssion response")
147 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
148
149 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
150 1
            self.logger.info("Activate session request")
151 1
            params = ua.ActivateSessionParameters.from_binary(body)
152
153 1
            if not self.session:
154
                self.logger.info("request to activate non-existing session")
155
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
156
157 1
            result = self.session.activate_session(params)
158
159 1
            response = ua.ActivateSessionResponse()
160 1
            response.Parameters = result
161
162 1
            self.logger.info("sending read response")
163 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
164
165 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
166 1
            self.logger.info("Read request")
167 1
            params = ua.ReadParameters.from_binary(body)
168
169 1
            results = self.session.read(params)
170
171 1
            response = ua.ReadResponse()
172 1
            response.Results = results
173
174 1
            self.logger.info("sending read response")
175 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
176
177 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
178 1
            self.logger.info("Write request")
179 1
            params = ua.WriteParameters.from_binary(body)
180
181 1
            results = self.session.write(params)
182
183 1
            response = ua.WriteResponse()
184 1
            response.Results = results
185
186 1
            self.logger.info("sending write response")
187 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
188
189 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
190 1
            self.logger.info("Browse request")
191 1
            params = ua.BrowseParameters.from_binary(body)
192
193 1
            results = self.session.browse(params)
194
195 1
            response = ua.BrowseResponse()
196 1
            response.Results = results
197
198 1
            self.logger.info("sending browse response")
199 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
200
201 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
202 1
            self.logger.info("get endpoints request")
203 1
            params = ua.GetEndpointsParameters.from_binary(body)
204
205 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
206
207 1
            response = ua.GetEndpointsResponse()
208 1
            response.Endpoints = endpoints
209
210 1
            self.logger.info("sending get endpoints response")
211 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
212
213 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
214 1
            self.logger.info("find servers request")
215 1
            params = ua.FindServersParameters.from_binary(body)
216
217 1
            servers = self.iserver.find_servers(params)
218
219 1
            response = ua.FindServersResponse()
220 1
            response.Servers = servers
221
222 1
            self.logger.info("sending find servers response")
223 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
224
225 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
226 1
            self.logger.info("register server request")
227 1
            serv = ua.RegisteredServer.from_binary(body)
228
229 1
            self.iserver.register_server(serv)
230
231 1
            response = ua.RegisterServerResponse()
232
233 1
            self.logger.info("sending register server response")
234 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
235
236 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
237
            self.logger.info("register server 2 request")
238
            params = ua.RegisterServer2Parameters.from_binary(body)
239
240
            results = self.iserver.register_server2(params)
241
242
            response = ua.RegisterServer2Response()
243
            response.ConfigurationResults = results
244
245
            self.logger.info("sending register server 2 response")
246
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
247
248 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
249 1
            self.logger.info("translate browsepaths to nodeids request")
250 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
251
252 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
253
254 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
255 1
            response.Results = paths
256
257 1
            self.logger.info("sending translate browsepaths to nodeids response")
258 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
259
260 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
261 1
            self.logger.info("add nodes request")
262 1
            params = ua.AddNodesParameters.from_binary(body)
263
264 1
            results = self.session.add_nodes(params.NodesToAdd)
265
266 1
            response = ua.AddNodesResponse()
267 1
            response.Results = results
268
269 1
            self.logger.info("sending add node response")
270 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
271
272 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
273 1
            self.logger.info("create subscription request")
274 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
275
276 1
            result = self.session.create_subscription(params, self.forward_publish_response)
277
278 1
            response = ua.CreateSubscriptionResponse()
279 1
            response.Parameters = result
280
281 1
            self.logger.info("sending create subscription response")
282 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
283
284 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
285 1
            self.logger.info("delete subscriptions request")
286 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
287
288 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
289
290 1
            response = ua.DeleteSubscriptionsResponse()
291 1
            response.Results = results
292
293 1
            self.logger.info("sending delte subscription response")
294 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
295
296 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
297 1
            self.logger.info("create monitored items request")
298 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
299 1
            results = self.session.create_monitored_items(params)
300
301 1
            response = ua.CreateMonitoredItemsResponse()
302 1
            response.Results = results
303
304 1
            self.logger.info("sending create monitored items response")
305 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
306
307 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
308
            self.logger.info("modify monitored items request")
309
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
310
            results = self.session.modify_monitored_items(params)
311
312
            response = ua.ModifyMonitoredItemsResponse()
313
            response.Results = results
314
315
            self.logger.info("sending modify monitored items response")
316
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
317
318 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
319 1
            self.logger.info("delete monitored items request")
320 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
321
322 1
            results = self.session.delete_monitored_items(params)
323
324 1
            response = ua.DeleteMonitoredItemsResponse()
325 1
            response.Results = results
326
327 1
            self.logger.info("sending delete monitored items response")
328 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
329
330 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
331 1
            self.logger.info("publish request")
332
333 1
            if not self.session:
334
                return False
335
336 1
            params = ua.PublishParameters.from_binary(body)
337
338 1
            data = PublishRequestData()
339 1
            data.requesthdr = requesthdr
340 1
            data.seqhdr = seqhdr
341 1
            data.algohdr = algohdr
342 1
            with self._datalock:
343 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
344 1
            self.session.publish(params.SubscriptionAcknowledgements)
345 1
            self.logger.info("publish forward to server")
346
347 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
348
            self.logger.info("re-publish request")
349
350
            params = ua.RepublishParameters.from_binary(body)
351
            msg = self.session.republish(params)
352
353
            response = ua.RepublishResponse()
354
            response.NotificationMessage = msg
355
356
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
357
358 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
359
            self.logger.info("close secure channel request")
360
            response = ua.CloseSecureChannelResponse()
361
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
362
            self.channel = None
363
            return False
364
365 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
366 1
            self.logger.info("call request")
367
368 1
            params = ua.CallParameters.from_binary(body)
369
370 1
            results = self.session.call(params.MethodsToCall)
371
372 1
            response = ua.CallResponse()
373 1
            response.Results = results
374
375 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
376
377
        else:
378
            self.logger.warning("Unknown message received %s", typeid)
379
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
380
381 1
        return True
382
383 1
    def _open_secure_channel(self, params):
384 1
        self.logger.info("open secure channel")
385 1
        if not self.channel or params.RequestType == ua.SecurityTokenRequestType.Issue:
386 1
            self.channel = ua.OpenSecureChannelResult()
387 1
            self.channel.SecurityToken.TokenId = 13  # random value
388 1
            self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
389 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
390 1
        self.channel.SecurityToken.TokenId += 1
391 1
        self.channel.SecurityToken.CreatedAt = datetime.now()
392 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
393 1
        self.channel.ServerNonce = utils.create_nonce()
394 1
        return self.channel
395
396 1
    def close(self):
397
        """
398
        to be called when client has disconnected to ensure we really close
399
        everything we should
400
        """
401 1
        print("Cleanup client connection: ", self.name)
402 1
        if self.session:
403
            self.session.close_session(True)
404