Passed
Push — dev ( 3fa5c0 )
by Olivier
03:18
created

opcua.server.UaProcessor.close()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2
Metric Value
cc 2
dl 0
loc 8
ccs 4
cts 4
cp 1
crap 2
rs 9.4285
1
2 1
import logging
3 1
from threading import Lock
4 1
from datetime import datetime
5
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16
17
18 1
class UaProcessor(object):
19
20 1
    def __init__(self, internal_server, socket):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.iserver = internal_server
23 1
        self.name = socket.get_extra_info('peername')
24 1
        self.sockname = socket.get_extra_info('sockname')
25 1
        self.session = None
26 1
        self.channel = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = Lock()
30 1
        self._publishdata_queue = []
31 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
32
33 1
    def set_policies(self, policies):
34 1
        self._connection.set_policy_factories(policies)
35
36 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
37 1
        with self._socketlock:
38 1
            response.ResponseHeader.RequestHandle = requesthandle
39 1
            data = self._connection.message_to_binary(response.to_binary(),
40
                    msgtype, seqhdr.RequestId)
41 1
            self.socket.write(data)
42
43 1
    def open_secure_channel(self, algohdr, seqhdr, body):
44 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
45
46 1
        self._connection.select_policy(algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
47 1
        channel = self._open_secure_channel(request.Parameters)
48
        # send response
49 1
        response = ua.OpenSecureChannelResponse()
50 1
        response.Parameters = channel
51 1
        self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
52
53 1
    def forward_publish_response(self, result):
54 1
        self.logger.info("forward publish response %s", result)
55 1
        with self._datalock:
56 1
            if len(self._publishdata_queue) == 0:
57
                self.logger.warning("Error server wants to send publish answer but no publish request is available")
58
                return
59 1
            requestdata = self._publishdata_queue.pop(0)
60 1
        response = ua.PublishResponse()
61 1
        response.Parameters = result
62
63 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
64
65 1
    def process(self, header, body):
66 1
        msg = self._connection.receive_from_header_and_body(header, body)
67 1
        if isinstance(msg, ua.Message):
68 1
            if header.MessageType == ua.MessageType.SecureOpen:
69 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
70
71 1
            elif header.MessageType == ua.MessageType.SecureClose:
72 1
                if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
73
                    self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
74 1
                return False
75
76 1
            elif header.MessageType == ua.MessageType.SecureMessage:
77 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
78
79 1
        elif isinstance(msg, ua.Hello):
80 1
            ack = ua.Acknowledge()
81 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
82 1
            ack.SendBufferSize = msg.SendBufferSize
83 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
84 1
            self.socket.write(data)
85
86
        elif isinstance(msg, ua.ErrorMessage):
87
            self.logger.warning("Received an error message type")
88
89
        else:
90
            self.logger.warning("Unsupported message type: %s", header.MessageType)
91
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
92 1
        return True
93
94 1
    def process_message(self, algohdr, seqhdr, body):
95 1
        typeid = ua.NodeId.from_binary(body)
96 1
        requesthdr = ua.RequestHeader.from_binary(body)
97 1
        try:
98 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
99 1
        except utils.ServiceError as e:
100 1
            status = ua.StatusCode(e.code)
101 1
            response = ua.ServiceFault()
102 1
            response.ResponseHeader.ServiceResult = status
103 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
104 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
105 1
            return True
106
107 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
108 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
109 1
            self.logger.info("Create session request")
110 1
            params = ua.CreateSessionParameters.from_binary(body)
111
112 1
            self.session = self.iserver.create_session(self.name, external=True)  # create the session on server
113 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
114
115 1
            response = ua.CreateSessionResponse()
116 1
            response.Parameters = sessiondata
117 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
118 1
            response.Parameters.ServerSignature.Signature = self._connection._security_policy.asymmetric_cryptography.signature(self._connection._security_policy.server_certificate + params.ClientNonce)
119 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
120
121 1
            self.logger.info("sending create sesssion response")
122 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
123
124 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
125 1
            self.logger.info("Close session request")
126 1
            deletesubs = ua.unpack_uatype('Boolean', body)
127
128 1
            self.session.close_session(deletesubs)
129
130 1
            response = ua.CloseSessionResponse()
131 1
            self.logger.info("sending close sesssion response")
132 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
133
134 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
135 1
            self.logger.info("Activate session request")
136 1
            params = ua.ActivateSessionParameters.from_binary(body)
137
138 1
            if not self.session:
139
                self.logger.info("request to activate non-existing session")
140
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
141
142 1
            self._connection._security_policy.asymmetric_cryptography.verify(self._connection._security_policy.client_certificate + self.session.nonce, params.ClientSignature.Signature)
143
144 1
            result = self.session.activate_session(params)
145
146 1
            response = ua.ActivateSessionResponse()
147 1
            response.Parameters = result
148
149 1
            self.logger.info("sending read response")
150 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
151
152 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
153 1
            self.logger.info("Read request")
154 1
            params = ua.ReadParameters.from_binary(body)
155
156 1
            results = self.session.read(params)
157
158 1
            response = ua.ReadResponse()
159 1
            response.Results = results
160
161 1
            self.logger.info("sending read response")
162 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
163
164 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
165 1
            self.logger.info("Write request")
166 1
            params = ua.WriteParameters.from_binary(body)
167
168 1
            results = self.session.write(params)
169
170 1
            response = ua.WriteResponse()
171 1
            response.Results = results
172
173 1
            self.logger.info("sending write response")
174 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
175
176 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
177 1
            self.logger.info("Browse request")
178 1
            params = ua.BrowseParameters.from_binary(body)
179
180 1
            results = self.session.browse(params)
181
182 1
            response = ua.BrowseResponse()
183 1
            response.Results = results
184
185 1
            self.logger.info("sending browse response")
186 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
187
188 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
189 1
            self.logger.info("get endpoints request")
190 1
            params = ua.GetEndpointsParameters.from_binary(body)
191
192 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
193
194 1
            response = ua.GetEndpointsResponse()
195 1
            response.Endpoints = endpoints
196
197 1
            self.logger.info("sending get endpoints response")
198 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
199
200 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
201 1
            self.logger.info("find servers request")
202 1
            params = ua.FindServersParameters.from_binary(body)
203
204 1
            servers = self.iserver.find_servers(params)
205
206 1
            response = ua.FindServersResponse()
207 1
            response.Servers = servers
208
209 1
            self.logger.info("sending find servers response")
210 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
211
212 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
213 1
            self.logger.info("register server request")
214 1
            serv = ua.RegisteredServer.from_binary(body)
215
216 1
            self.iserver.register_server(serv)
217
218 1
            response = ua.RegisterServerResponse()
219
220 1
            self.logger.info("sending register server response")
221 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
222
223 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
224
            self.logger.info("register server 2 request")
225
            params = ua.RegisterServer2Parameters.from_binary(body)
226
227
            results = self.iserver.register_server2(params)
228
229
            response = ua.RegisterServer2Response()
230
            response.ConfigurationResults = results
231
232
            self.logger.info("sending register server 2 response")
233
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
234
235 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
236 1
            self.logger.info("translate browsepaths to nodeids request")
237 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
238
239 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
240
241 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
242 1
            response.Results = paths
243
244 1
            self.logger.info("sending translate browsepaths to nodeids response")
245 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
246
247 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
248 1
            self.logger.info("add nodes request")
249 1
            params = ua.AddNodesParameters.from_binary(body)
250
251 1
            results = self.session.add_nodes(params.NodesToAdd)
252
253 1
            response = ua.AddNodesResponse()
254 1
            response.Results = results
255
256 1
            self.logger.info("sending add node response")
257 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
258
259 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
260
            self.logger.info("delete nodes request")
261
            params = ua.DeleteNodesParameters.from_binary(body)
262
263
            results = self.session.delete_nodes(params.NodesToDelete)
264
265
            response = ua.DeleteNodesResponse()
266
            response.Results = results
267
268
            self.logger.info("sending delete node response")
269
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
270
271 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
272 1
            self.logger.info("create subscription request")
273 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
274
275 1
            result = self.session.create_subscription(params, self.forward_publish_response)
276
277 1
            response = ua.CreateSubscriptionResponse()
278 1
            response.Parameters = result
279
280 1
            self.logger.info("sending create subscription response")
281 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
282
283 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
284 1
            self.logger.info("delete subscriptions request")
285 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
286
287 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
288
289 1
            response = ua.DeleteSubscriptionsResponse()
290 1
            response.Results = results
291
292 1
            self.logger.info("sending delte subscription response")
293 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
294
295 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
296 1
            self.logger.info("create monitored items request")
297 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
298 1
            results = self.session.create_monitored_items(params)
299
300 1
            response = ua.CreateMonitoredItemsResponse()
301 1
            response.Results = results
302
303 1
            self.logger.info("sending create monitored items response")
304 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
305
306 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
307
            self.logger.info("modify monitored items request")
308
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
309
            results = self.session.modify_monitored_items(params)
310
311
            response = ua.ModifyMonitoredItemsResponse()
312
            response.Results = results
313
314
            self.logger.info("sending modify monitored items response")
315
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
316
317 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
318 1
            self.logger.info("delete monitored items request")
319 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
320
321 1
            results = self.session.delete_monitored_items(params)
322
323 1
            response = ua.DeleteMonitoredItemsResponse()
324 1
            response.Results = results
325
326 1
            self.logger.info("sending delete monitored items response")
327 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
328
329 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
330 1
            self.logger.info("publish request")
331
332 1
            if not self.session:
333
                return False
334
335 1
            params = ua.PublishParameters.from_binary(body)
336
337 1
            data = PublishRequestData()
338 1
            data.requesthdr = requesthdr
339 1
            data.seqhdr = seqhdr
340 1
            data.algohdr = algohdr
341 1
            with self._datalock:
342 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
343 1
            self.session.publish(params.SubscriptionAcknowledgements)
344 1
            self.logger.info("publish forward to server")
345
346 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
347
            self.logger.info("re-publish request")
348
349
            params = ua.RepublishParameters.from_binary(body)
350
            msg = self.session.republish(params)
351
352
            response = ua.RepublishResponse()
353
            response.NotificationMessage = msg
354
355
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
356
357 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
358
            self.logger.info("close secure channel request")
359
            response = ua.CloseSecureChannelResponse()
360
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
361
            self.channel = None
362
            return False
363
364 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
365 1
            self.logger.info("call request")
366
367 1
            params = ua.CallParameters.from_binary(body)
368
369 1
            results = self.session.call(params.MethodsToCall)
370
371 1
            response = ua.CallResponse()
372 1
            response.Results = results
373
374 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
375
376
        else:
377 1
            self.logger.warning("Unknown message received %s", typeid)
378 1
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
379
380 1
        return True
381
382 1
    def _open_secure_channel(self, params):
383 1
        self.logger.info("open secure channel")
384 1
        if not self.channel or params.RequestType == ua.SecurityTokenRequestType.Issue:
385 1
            self.channel = ua.OpenSecureChannelResult()
386 1
            self.channel.SecurityToken.TokenId = 13  # random value
387 1
            self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
388 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
389 1
        self.channel.SecurityToken.TokenId += 1
390 1
        self.channel.SecurityToken.CreatedAt = datetime.utcnow()
391 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
392 1
        self.channel.ServerNonce = utils.create_nonce(self._connection._security_policy.symmetric_key_size)
393 1
        self._connection.set_security_token(self.channel.SecurityToken)
394 1
        self._connection._security_policy.make_symmetric_key(self.channel.ServerNonce, params.ClientNonce)
395 1
        return self.channel
396
397 1
    def close(self):
398
        """
399
        to be called when client has disconnected to ensure we really close
400
        everything we should
401
        """
402 1
        print("Cleanup client connection: ", self.name)
403 1
        if self.session:
404
            self.session.close_session(True)
405