Completed
Push — master ( 15ed61...18100b )
by Olivier
03:06 queued 35s
created

forward_publish_response()   B

Complexity

Conditions 5

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 5.246
Metric Value
cc 5
dl 0
loc 16
ccs 11
cts 14
cp 0.7856
crap 5.246
rs 8.5454
1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
from datetime import datetime
5 1
import time
6
7 1
from opcua import ua
8 1
from opcua.common import utils
9
10
11 1
class PublishRequestData(object):
12
13 1
    def __init__(self):
14 1
        self.requesthdr = None
15 1
        self.algohdr = None
16 1
        self.seqhdr = None
17 1
        self.timestamp = time.time()
18
19
20 1
class UaProcessor(object):
21
22 1
    def __init__(self, internal_server, socket):
23 1
        self.logger = logging.getLogger(__name__)
24 1
        self.iserver = internal_server
25 1
        self.name = socket.get_extra_info('peername')
26 1
        self.sockname = socket.get_extra_info('sockname')
27 1
        self.session = None
28 1
        self.channel = None
29 1
        self.socket = socket
30 1
        self._socketlock = Lock()
31 1
        self._datalock = RLock()
32 1
        self._publishdata_queue = []
33 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
34 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
35
36 1
    def set_policies(self, policies):
37 1
        self._connection.set_policy_factories(policies)
38
39 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
40 1
        with self._socketlock:
41 1
            response.ResponseHeader.RequestHandle = requesthandle
42 1
            data = self._connection.message_to_binary(response.to_binary(), msgtype, seqhdr.RequestId)
43 1
            self.socket.write(data)
44
45 1
    def open_secure_channel(self, algohdr, seqhdr, body):
46 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
47
48 1
        self._connection.select_policy(algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
49 1
        channel = self._open_secure_channel(request.Parameters)
50
        # send response
51 1
        response = ua.OpenSecureChannelResponse()
52 1
        response.Parameters = channel
53 1
        self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
54
55 1
    def forward_publish_response(self, result):
56 1
        self.logger.info("forward publish response %s", result)
57 1
        with self._datalock:
58 1
            while True:
59 1
                if len(self._publishdata_queue) == 0:
60
                    self._publish_result_queue.append(result)
61
                    self.logger.info("Server wants to send publish answer but no publish request is available, enqueing notification, length of result queue is %s", len(self._publish_result_queue))
62
                    return
63 1
                requestdata = self._publishdata_queue.pop(0)
64 1
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
65 1
                    break
66
67 1
        response = ua.PublishResponse()
68 1
        response.Parameters = result
69
70 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
71
72 1
    def process(self, header, body):
73 1
        msg = self._connection.receive_from_header_and_body(header, body)
74 1
        if isinstance(msg, ua.Message):
75 1
            if header.MessageType == ua.MessageType.SecureOpen:
76 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
77
78 1
            elif header.MessageType == ua.MessageType.SecureClose:
79 1
                if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
80
                    self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
81 1
                return False
82
83 1
            elif header.MessageType == ua.MessageType.SecureMessage:
84 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
85
86 1
        elif isinstance(msg, ua.Hello):
87 1
            ack = ua.Acknowledge()
88 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
89 1
            ack.SendBufferSize = msg.SendBufferSize
90 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
91 1
            self.socket.write(data)
92
93
        elif isinstance(msg, ua.ErrorMessage):
94
            self.logger.warning("Received an error message type")
95
96
        else:
97
            self.logger.warning("Unsupported message type: %s", header.MessageType)
98
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
99 1
        return True
100
101 1
    def process_message(self, algohdr, seqhdr, body):
102 1
        typeid = ua.NodeId.from_binary(body)
103 1
        requesthdr = ua.RequestHeader.from_binary(body)
104 1
        try:
105 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
106 1
        except utils.ServiceError as e:
107 1
            status = ua.StatusCode(e.code)
108 1
            response = ua.ServiceFault()
109 1
            response.ResponseHeader.ServiceResult = status
110 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
111 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
112 1
            return True
113
114 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
115 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
116 1
            self.logger.info("Create session request")
117 1
            params = ua.CreateSessionParameters.from_binary(body)
118
119 1
            self.session = self.iserver.create_session(self.name, external=True)  # create the session on server
120 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
121
122 1
            response = ua.CreateSessionResponse()
123 1
            response.Parameters = sessiondata
124 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
125 1
            response.Parameters.ServerSignature.Signature = self._connection._security_policy.asymmetric_cryptography.signature(self._connection._security_policy.server_certificate + params.ClientNonce)
126 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
127
128 1
            self.logger.info("sending create sesssion response")
129 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
130
131 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
132 1
            self.logger.info("Close session request")
133 1
            deletesubs = ua.unpack_uatype('Boolean', body)
134
135 1
            self.session.close_session(deletesubs)
136
137 1
            response = ua.CloseSessionResponse()
138 1
            self.logger.info("sending close sesssion response")
139 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
140
141 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
142 1
            self.logger.info("Activate session request")
143 1
            params = ua.ActivateSessionParameters.from_binary(body)
144
145 1
            if not self.session:
146
                self.logger.info("request to activate non-existing session")
147
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
148
149 1
            self._connection._security_policy.asymmetric_cryptography.verify(self._connection._security_policy.client_certificate + self.session.nonce, params.ClientSignature.Signature)
150
151 1
            result = self.session.activate_session(params)
152
153 1
            response = ua.ActivateSessionResponse()
154 1
            response.Parameters = result
155
156 1
            self.logger.info("sending read response")
157 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
158
159 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
160 1
            self.logger.info("Read request")
161 1
            params = ua.ReadParameters.from_binary(body)
162
163 1
            results = self.session.read(params)
164
165 1
            response = ua.ReadResponse()
166 1
            response.Results = results
167
168 1
            self.logger.info("sending read response")
169 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
170
171 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
172 1
            self.logger.info("Write request")
173 1
            params = ua.WriteParameters.from_binary(body)
174
175 1
            results = self.session.write(params)
176
177 1
            response = ua.WriteResponse()
178 1
            response.Results = results
179
180 1
            self.logger.info("sending write response")
181 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
182
183 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
184 1
            self.logger.info("Browse request")
185 1
            params = ua.BrowseParameters.from_binary(body)
186
187 1
            results = self.session.browse(params)
188
189 1
            response = ua.BrowseResponse()
190 1
            response.Results = results
191
192 1
            self.logger.info("sending browse response")
193 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
194
195 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
196 1
            self.logger.info("get endpoints request")
197 1
            params = ua.GetEndpointsParameters.from_binary(body)
198
199 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
200
201 1
            response = ua.GetEndpointsResponse()
202 1
            response.Endpoints = endpoints
203
204 1
            self.logger.info("sending get endpoints response")
205 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
206
207 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
208 1
            self.logger.info("find servers request")
209 1
            params = ua.FindServersParameters.from_binary(body)
210
211 1
            servers = self.iserver.find_servers(params)
212
213 1
            response = ua.FindServersResponse()
214 1
            response.Servers = servers
215
216 1
            self.logger.info("sending find servers response")
217 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
218
219 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
220 1
            self.logger.info("register server request")
221 1
            serv = ua.RegisteredServer.from_binary(body)
222
223 1
            self.iserver.register_server(serv)
224
225 1
            response = ua.RegisterServerResponse()
226
227 1
            self.logger.info("sending register server response")
228 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
229
230 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
231
            self.logger.info("register server 2 request")
232
            params = ua.RegisterServer2Parameters.from_binary(body)
233
234
            results = self.iserver.register_server2(params)
235
236
            response = ua.RegisterServer2Response()
237
            response.ConfigurationResults = results
238
239
            self.logger.info("sending register server 2 response")
240
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
241
242 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
243 1
            self.logger.info("translate browsepaths to nodeids request")
244 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
245
246 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
247
248 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
249 1
            response.Results = paths
250
251 1
            self.logger.info("sending translate browsepaths to nodeids response")
252 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
253
254 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
255 1
            self.logger.info("add nodes request")
256 1
            params = ua.AddNodesParameters.from_binary(body)
257
258 1
            results = self.session.add_nodes(params.NodesToAdd)
259
260 1
            response = ua.AddNodesResponse()
261 1
            response.Results = results
262
263 1
            self.logger.info("sending add node response")
264 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
265
266 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
267 1
            self.logger.info("delete nodes request")
268 1
            params = ua.DeleteNodesParameters.from_binary(body)
269
270 1
            results = self.session.delete_nodes(params.NodesToDelete)
271
272 1
            response = ua.DeleteNodesResponse()
273 1
            response.Results = results
274
275 1
            self.logger.info("sending delete node response")
276 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
277
278 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
279 1
            self.logger.info("create subscription request")
280 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
281
282 1
            result = self.session.create_subscription(params, self.forward_publish_response)
283
284 1
            response = ua.CreateSubscriptionResponse()
285 1
            response.Parameters = result
286
287 1
            self.logger.info("sending create subscription response")
288 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
289
290 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
291 1
            self.logger.info("delete subscriptions request")
292 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
293
294 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
295
296 1
            response = ua.DeleteSubscriptionsResponse()
297 1
            response.Results = results
298
299 1
            self.logger.info("sending delte subscription response")
300 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
301
302 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
303 1
            self.logger.info("create monitored items request")
304 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
305 1
            results = self.session.create_monitored_items(params)
306
307 1
            response = ua.CreateMonitoredItemsResponse()
308 1
            response.Results = results
309
310 1
            self.logger.info("sending create monitored items response")
311 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
312
313 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
314
            self.logger.info("modify monitored items request")
315
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
316
            results = self.session.modify_monitored_items(params)
317
318
            response = ua.ModifyMonitoredItemsResponse()
319
            response.Results = results
320
321
            self.logger.info("sending modify monitored items response")
322
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
323
324 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
325 1
            self.logger.info("delete monitored items request")
326 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
327
328 1
            results = self.session.delete_monitored_items(params)
329
330 1
            response = ua.DeleteMonitoredItemsResponse()
331 1
            response.Results = results
332
333 1
            self.logger.info("sending delete monitored items response")
334 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
335
336 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
337
            self.logger.info("history read request")
338
            params = ua.HistoryReadParameters.from_binary(body)
339
340
            results = self.session.history_read(params)
341
342
            response = ua.HistoryReadResponse()
343
            response.Results = results
344
345
            self.logger.info("sending history read response")
346
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
347
348 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
349 1
            self.logger.info("publish request")
350
351 1
            if not self.session:
352
                return False
353
354 1
            params = ua.PublishParameters.from_binary(body)
355
356 1
            data = PublishRequestData()
357 1
            data.requesthdr = requesthdr
358 1
            data.seqhdr = seqhdr
359 1
            data.algohdr = algohdr
360 1
            with self._datalock:
361 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
362 1
                if self._publish_result_queue:
363
                    result = self._publish_result_queue.pop(0)
364
                    self.forward_publish_response(result)
365 1
            self.session.publish(params.SubscriptionAcknowledgements)
366 1
            self.logger.info("publish forward to server")
367
368 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
369
            self.logger.info("re-publish request")
370
371
            params = ua.RepublishParameters.from_binary(body)
372
            msg = self.session.republish(params)
373
374
            response = ua.RepublishResponse()
375
            response.NotificationMessage = msg
376
377
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
378
379 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
380
            self.logger.info("close secure channel request")
381
            response = ua.CloseSecureChannelResponse()
382
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
383
            self.channel = None
384
            return False
385
386 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
387 1
            self.logger.info("call request")
388
389 1
            params = ua.CallParameters.from_binary(body)
390
391 1
            results = self.session.call(params.MethodsToCall)
392
393 1
            response = ua.CallResponse()
394 1
            response.Results = results
395
396 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
397
398
        else:
399 1
            self.logger.warning("Unknown message received %s", typeid)
400 1
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
401
402 1
        return True
403
404 1
    def _open_secure_channel(self, params):
405 1
        self.logger.info("open secure channel")
406 1
        if not self.channel or params.RequestType == ua.SecurityTokenRequestType.Issue:
407 1
            self.channel = ua.OpenSecureChannelResult()
408 1
            self.channel.SecurityToken.TokenId = 13  # random value
409 1
            self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
410 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
411 1
        self.channel.SecurityToken.TokenId += 1
412 1
        self.channel.SecurityToken.CreatedAt = datetime.utcnow()
413 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
414 1
        self.channel.ServerNonce = utils.create_nonce(self._connection._security_policy.symmetric_key_size)
415 1
        self._connection.set_security_token(self.channel.SecurityToken)
416 1
        self._connection._security_policy.make_symmetric_key(self.channel.ServerNonce, params.ClientNonce)
417 1
        return self.channel
418
419 1
    def close(self):
420
        """
421
        to be called when client has disconnected to ensure we really close
422
        everything we should
423
        """
424 1
        print("Cleanup client connection: ", self.name)
425 1
        if self.session:
426
            self.session.close_session(True)
427