Completed
Pull Request — master (#105)
by
unknown
06:30
created

opcua.server.UAProcessor.send_response()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2
Metric Value
cc 2
dl 0
loc 6
ccs 5
cts 5
cp 1
crap 2
rs 9.4286
1
2 1
import logging
3 1
from threading import Lock
4 1
from datetime import datetime
5
6 1
from opcua import ua
7 1
from opcua.common import utils
8
9
10 1
class PublishRequestData(object):
11
12 1
    def __init__(self):
13 1
        self.requesthdr = None
14 1
        self.algohdr = None
15 1
        self.seqhdr = None
16
17
18 1
class UAProcessor(object):
19
20 1
    def __init__(self, internal_server, socket):
21 1
        self.logger = logging.getLogger(__name__)
22 1
        self.iserver = internal_server
23 1
        self.name = socket.get_extra_info('peername')
24 1
        self.sockname = socket.get_extra_info('sockname')
25 1
        self.session = None
26 1
        self.channel = None
27 1
        self.socket = socket
28 1
        self._socketlock = Lock()
29 1
        self._datalock = Lock()
30 1
        self._publishdata_queue = []
31 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
32
33 1
    def set_policies(self, policies):
34 1
        self._connection.set_policy_factories(policies)
35
36 1
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
37 1
        with self._socketlock:
38 1
            response.ResponseHeader.RequestHandle = requesthandle
39 1
            data = self._connection.message_to_binary(response.to_binary(),
40
                    msgtype, seqhdr.RequestId)
41 1
            self.socket.write(data)
42
43 1
    def open_secure_channel(self, algohdr, seqhdr, body):
44 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
45
46 1
        self._connection.select_policy(algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
47 1
        channel = self._open_secure_channel(request.Parameters)
48
        # send response
49 1
        response = ua.OpenSecureChannelResponse()
50 1
        response.Parameters = channel
51 1
        self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
52
53 1
    def forward_publish_response(self, result):
54 1
        self.logger.info("forward publish response %s", result)
55 1
        with self._datalock:
56 1
            if len(self._publishdata_queue) == 0:
57
                self.logger.warning("Error server wants to send publish answer but no publish request is available")
58
                return
59 1
            requestdata = self._publishdata_queue.pop(0)
60 1
        response = ua.PublishResponse()
61 1
        response.Parameters = result
62
63 1
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
64
65 1
    def process(self, header, body):
66 1
        msg = self._connection.receive_from_header_and_body(header, body)
67 1
        if msg is None:
68
            pass    # received intermediate chunk
69 1
        elif isinstance(msg, ua.Message):
70 1
            if header.MessageType == ua.MessageType.SecureOpen:
71 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
72
73 1
            elif header.MessageType == ua.MessageType.SecureClose:
74 1
                if not self.channel or header.ChannelId != self.channel.SecurityToken.ChannelId:
75
                    self.logger.warning("Request to close channel %s which was not issued, current channel is %s", header.ChannelId, self.channel)
76 1
                return False
77
78 1
            elif header.MessageType == ua.MessageType.SecureMessage:
79 1
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
80
81 1
        elif isinstance(msg, ua.Hello):
82 1
            ack = ua.Acknowledge()
83 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
84 1
            ack.SendBufferSize = msg.SendBufferSize
85 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
86 1
            self.socket.write(data)
87
88
        elif isinstance(msg, ua.ErrorMessage):
89
            self.logger.warning("Received an error message type")
90
91
        else:
92
            self.logger.warning("Unsupported message type: %s", header.MessageType)
93
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
94 1
        return True
95
96 1
    def process_message(self, algohdr, seqhdr, body):
97 1
        typeid = ua.NodeId.from_binary(body)
98 1
        requesthdr = ua.RequestHeader.from_binary(body)
99 1
        try:
100 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
101
        except utils.ServiceError as e:
102
            status = ua.StatusCode(e.code)
103
            response = ua.ServiceFault()
104
            response.ResponseHeader.ServiceResult = status
105
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
106
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
107
            return True
108
109 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
110 1
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
111 1
            self.logger.info("Create session request")
112 1
            params = ua.CreateSessionParameters.from_binary(body)
113
114 1
            self.session = self.iserver.create_session(self.name, external=True)  # create the session on server
115 1
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
116
117 1
            response = ua.CreateSessionResponse()
118 1
            response.Parameters = sessiondata
119 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
120 1
            response.Parameters.ServerSignature.Signature = self._connection._security_policy.asymmetric_cryptography.signature(self._connection._security_policy.server_certificate + params.ClientNonce)
121 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
122
123 1
            self.logger.info("sending create sesssion response")
124 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
125
126 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
127 1
            self.logger.info("Close session request")
128 1
            deletesubs = ua.unpack_uatype('Boolean', body)
129
130 1
            self.session.close_session(deletesubs)
131
132 1
            response = ua.CloseSessionResponse()
133 1
            self.logger.info("sending close sesssion response")
134 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
135
136 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
137 1
            self.logger.info("Activate session request")
138 1
            params = ua.ActivateSessionParameters.from_binary(body)
139
140 1
            if not self.session:
141
                self.logger.info("request to activate non-existing session")
142
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
143
144 1
            self._connection._security_policy.asymmetric_cryptography.verify(self._connection._security_policy.client_certificate + self.session.nonce, params.ClientSignature.Signature)
145
146 1
            result = self.session.activate_session(params)
147
148 1
            response = ua.ActivateSessionResponse()
149 1
            response.Parameters = result
150
151 1
            self.logger.info("sending read response")
152 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
153
154 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
155 1
            self.logger.info("Read request")
156 1
            params = ua.ReadParameters.from_binary(body)
157
158 1
            results = self.session.read(params)
159
160 1
            response = ua.ReadResponse()
161 1
            response.Results = results
162
163 1
            self.logger.info("sending read response")
164 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
165
166 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
167 1
            self.logger.info("Write request")
168 1
            params = ua.WriteParameters.from_binary(body)
169
170 1
            results = self.session.write(params)
171
172 1
            response = ua.WriteResponse()
173 1
            response.Results = results
174
175 1
            self.logger.info("sending write response")
176 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
177
178 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
179 1
            self.logger.info("Browse request")
180 1
            params = ua.BrowseParameters.from_binary(body)
181
182 1
            results = self.session.browse(params)
183
184 1
            response = ua.BrowseResponse()
185 1
            response.Results = results
186
187 1
            self.logger.info("sending browse response")
188 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
189
190 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
191 1
            self.logger.info("get endpoints request")
192 1
            params = ua.GetEndpointsParameters.from_binary(body)
193
194 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
195
196 1
            response = ua.GetEndpointsResponse()
197 1
            response.Endpoints = endpoints
198
199 1
            self.logger.info("sending get endpoints response")
200 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
201
202 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
203 1
            self.logger.info("find servers request")
204 1
            params = ua.FindServersParameters.from_binary(body)
205
206 1
            servers = self.iserver.find_servers(params)
207
208 1
            response = ua.FindServersResponse()
209 1
            response.Servers = servers
210
211 1
            self.logger.info("sending find servers response")
212 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
213
214 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
215 1
            self.logger.info("register server request")
216 1
            serv = ua.RegisteredServer.from_binary(body)
217
218 1
            self.iserver.register_server(serv)
219
220 1
            response = ua.RegisterServerResponse()
221
222 1
            self.logger.info("sending register server response")
223 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
224
225 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
226
            self.logger.info("register server 2 request")
227
            params = ua.RegisterServer2Parameters.from_binary(body)
228
229
            results = self.iserver.register_server2(params)
230
231
            response = ua.RegisterServer2Response()
232
            response.ConfigurationResults = results
233
234
            self.logger.info("sending register server 2 response")
235
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
236
237 1
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
238 1
            self.logger.info("translate browsepaths to nodeids request")
239 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
240
241 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
242
243 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
244 1
            response.Results = paths
245
246 1
            self.logger.info("sending translate browsepaths to nodeids response")
247 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
248
249 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
250 1
            self.logger.info("add nodes request")
251 1
            params = ua.AddNodesParameters.from_binary(body)
252
253 1
            results = self.session.add_nodes(params.NodesToAdd)
254
255 1
            response = ua.AddNodesResponse()
256 1
            response.Results = results
257
258 1
            self.logger.info("sending add node response")
259 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
260
261 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
262 1
            self.logger.info("create subscription request")
263 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
264
265 1
            result = self.session.create_subscription(params, self.forward_publish_response)
266
267 1
            response = ua.CreateSubscriptionResponse()
268 1
            response.Parameters = result
269
270 1
            self.logger.info("sending create subscription response")
271 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
272
273 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
274 1
            self.logger.info("delete subscriptions request")
275 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
276
277 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
278
279 1
            response = ua.DeleteSubscriptionsResponse()
280 1
            response.Results = results
281
282 1
            self.logger.info("sending delte subscription response")
283 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
284
285 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
286 1
            self.logger.info("create monitored items request")
287 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
288 1
            results = self.session.create_monitored_items(params)
289
290 1
            response = ua.CreateMonitoredItemsResponse()
291 1
            response.Results = results
292
293 1
            self.logger.info("sending create monitored items response")
294 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
295
296 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
297
            self.logger.info("modify monitored items request")
298
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
299
            results = self.session.modify_monitored_items(params)
300
301
            response = ua.ModifyMonitoredItemsResponse()
302
            response.Results = results
303
304
            self.logger.info("sending modify monitored items response")
305
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
306
307 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
308 1
            self.logger.info("delete monitored items request")
309 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
310
311 1
            results = self.session.delete_monitored_items(params)
312
313 1
            response = ua.DeleteMonitoredItemsResponse()
314 1
            response.Results = results
315
316 1
            self.logger.info("sending delete monitored items response")
317 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
318
319 1
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
320 1
            self.logger.info("publish request")
321
322 1
            if not self.session:
323
                return False
324
325 1
            params = ua.PublishParameters.from_binary(body)
326
327 1
            data = PublishRequestData()
328 1
            data.requesthdr = requesthdr
329 1
            data.seqhdr = seqhdr
330 1
            data.algohdr = algohdr
331 1
            with self._datalock:
332 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
333 1
            self.session.publish(params.SubscriptionAcknowledgements)
334 1
            self.logger.info("publish forward to server")
335
336 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
337
            self.logger.info("re-publish request")
338
339
            params = ua.RepublishParameters.from_binary(body)
340
            msg = self.session.republish(params)
341
342
            response = ua.RepublishResponse()
343
            response.NotificationMessage = msg
344
345
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
346
347 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
348
            self.logger.info("close secure channel request")
349
            response = ua.CloseSecureChannelResponse()
350
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
351
            self.channel = None
352
            return False
353
354 1
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
355 1
            self.logger.info("call request")
356
357 1
            params = ua.CallParameters.from_binary(body)
358
359 1
            results = self.session.call(params.MethodsToCall)
360
361 1
            response = ua.CallResponse()
362 1
            response.Results = results
363
364 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
365
366
        else:
367
            self.logger.warning("Unknown message received %s", typeid)
368
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
369
370 1
        return True
371
372 1
    def _open_secure_channel(self, params):
373 1
        self.logger.info("open secure channel")
374 1
        if not self.channel or params.RequestType == ua.SecurityTokenRequestType.Issue:
375 1
            self.channel = ua.OpenSecureChannelResult()
376 1
            self.channel.SecurityToken.TokenId = 13  # random value
377 1
            self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
378 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
379 1
        self.channel.SecurityToken.TokenId += 1
380 1
        self.channel.SecurityToken.CreatedAt = datetime.now()
381 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
382 1
        self.channel.ServerNonce = utils.create_nonce(self._connection._security_policy.symmetric_key_size)
383 1
        self._connection.set_security_token(self.channel.SecurityToken)
384 1
        self._connection._security_policy.make_symmetric_key(self.channel.ServerNonce, params.ClientNonce)
385 1
        return self.channel
386
387 1
    def close(self):
388
        """
389
        to be called when client has disconnected to ensure we really close
390
        everything we should
391
        """
392 1
        print("Cleanup client connection: ", self.name)
393 1
        if self.session:
394
            self.session.close_session(True)
395