Completed
Pull Request — master (#206)
by Olivier
03:46
created

UaProcessor.set_policies()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 2
ccs 1
cts 1
cp 1
crap 1
rs 10
1
2 1
import logging
3 1
from threading import RLock, Lock
4 1
from datetime import datetime
5 1
import time
6
7 1
from opcua import ua
8 1
from opcua.common import utils
9
10
11 1
class PublishRequestData(object):
12
13 1
    def __init__(self):
14 1
        self.requesthdr = None
15 1
        self.algohdr = None
16 1
        self.seqhdr = None
17 1
        self.timestamp = time.time()
18
19
20 1
class UaProcessor(object):
21
22 1
    def __init__(self, internal_server, socket):
23 1
        self.logger = logging.getLogger(__name__)
24 1
        self.iserver = internal_server
25 1
        self.name = socket.get_extra_info('peername')
26 1
        self.sockname = socket.get_extra_info('sockname')
27 1
        self.session = None
28 1
        self.socket = socket
29 1
        self._socketlock = Lock()
30 1
        self._datalock = RLock()
31 1
        self._publishdata_queue = []
32 1
        self._publish_result_queue = []  # used when we need to wait for PublishRequest
33 1
        self._connection = ua.SecureConnection(ua.SecurityPolicy())
34 1
35
    def set_policies(self, policies):
36 1
        self._connection.set_policy_factories(policies)
37 1
38
    def send_response(self, requesthandle, algohdr, seqhdr, response, msgtype=ua.MessageType.SecureMessage):
39 1
        with self._socketlock:
40 1
            response.ResponseHeader.RequestHandle = requesthandle
41 1
            data = self._connection.message_to_binary(response.to_binary(), message_type=msgtype, request_id=seqhdr.RequestId, algohdr=algohdr)
42 1
            self.socket.write(data)
43 1
44
    def open_secure_channel(self, algohdr, seqhdr, body):
45 1
        request = ua.OpenSecureChannelRequest.from_binary(body)
46 1
47
        self._connection.select_policy(algohdr.SecurityPolicyURI, algohdr.SenderCertificate, request.Parameters.SecurityMode)
48 1
        channel = self._connection.open(request.Parameters, self.iserver)
49 1
        # send response
50
        response = ua.OpenSecureChannelResponse()
51 1
        response.Parameters = channel
52 1
        self.send_response(request.RequestHeader.RequestHandle, None, seqhdr, response, ua.MessageType.SecureOpen)
53 1
54
    def forward_publish_response(self, result):
55 1
        self.logger.info("forward publish response %s", result)
56 1
        with self._datalock:
57 1
            while True:
58 1
                if len(self._publishdata_queue) == 0:
59 1
                    self._publish_result_queue.append(result)
60
                    self.logger.info("Server wants to send publish answer but no publish request is available, enqueing notification, length of result queue is %s", len(self._publish_result_queue))
61
                    return
62
                requestdata = self._publishdata_queue.pop(0)
63 1
                if time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000:
64 1
                    break
65 1
66
        response = ua.PublishResponse()
67 1
        response.Parameters = result
68 1
69
        self.send_response(requestdata.requesthdr.RequestHandle, requestdata.algohdr, requestdata.seqhdr, response)
70 1
71
    def process(self, header, body):
72 1
        msg = self._connection.receive_from_header_and_body(header, body)
73 1
        if isinstance(msg, ua.Message):
74 1
            if header.MessageType == ua.MessageType.SecureOpen:
75 1
                self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
76 1
77
            elif header.MessageType == ua.MessageType.SecureClose:
78 1
                self._connection.close()
79 1
                return False
80
81 1
            elif header.MessageType == ua.MessageType.SecureMessage:
82
                return self.process_message(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
83 1
84 1
        elif isinstance(msg, ua.Hello):
85
            ack = ua.Acknowledge()
86 1
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
87 1
            ack.SendBufferSize = msg.SendBufferSize
88 1
            data = self._connection.tcp_to_binary(ua.MessageType.Acknowledge, ack)
89 1
            self.socket.write(data)
90 1
91 1
        elif isinstance(msg, ua.ErrorMessage):
92
            self.logger.warning("Received an error message type")
93
94
        else:
95
            self.logger.warning("Unsupported message type: %s", header.MessageType)
96
            raise utils.ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
97
        return True
98
99 1
    def process_message(self, algohdr, seqhdr, body):
100
        typeid = ua.NodeId.from_binary(body)
101 1
        requesthdr = ua.RequestHeader.from_binary(body)
102 1
        try:
103 1
            return self._process_message(typeid, requesthdr, algohdr, seqhdr, body)
104 1
        except utils.ServiceError as e:
105 1
            status = ua.StatusCode(e.code)
106 1
            response = ua.ServiceFault()
107 1
            response.ResponseHeader.ServiceResult = status
108 1
            self.logger.info("sending service fault response: %s (%s)", status.doc, status.name)
109 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
110 1
            return True
111 1
112 1
    def _process_message(self, typeid, requesthdr, algohdr, seqhdr, body):
113
        if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
114 1
            self.logger.info("Create session request")
115 1
            params = ua.CreateSessionParameters.from_binary(body)
116 1
117 1
            self.session = self.iserver.create_session(self.name, external=True)  # create the session on server
118
            sessiondata = self.session.create_session(params, sockname=self.sockname)  # get a session creation result to send back
119 1
120 1
            response = ua.CreateSessionResponse()
121
            response.Parameters = sessiondata
122 1
            response.Parameters.ServerCertificate = self._connection._security_policy.client_certificate
123 1
            if self._connection._security_policy.server_certificate is None:
124 1
                data = params.ClientNonce
125 1
            else:
126 1
                data = self._connection._security_policy.server_certificate + params.ClientNonce
127
            response.Parameters.ServerSignature.Signature = self._connection._security_policy.asymmetric_cryptography.signature(data)
128 1
            response.Parameters.ServerSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
129 1
130 1
            self.logger.info("sending create sesssion response")
131
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
132 1
133 1
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
134
            self.logger.info("Close session request")
135 1
            deletesubs = ua.unpack_uatype('Boolean', body)
136 1
137 1
            self.session.close_session(deletesubs)
138
139 1
            response = ua.CloseSessionResponse()
140
            self.logger.info("sending close sesssion response")
141 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
142 1
143 1
        elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
144
            self.logger.info("Activate session request")
145 1
            params = ua.ActivateSessionParameters.from_binary(body)
146 1
147 1
            if not self.session:
148
                self.logger.info("request to activate non-existing session")
149 1
                raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
150
151
            if self._connection._security_policy.client_certificate is None:
152
                data = self.session.nonce
153 1
            else:
154 1
                data = self._connection._security_policy.client_certificate + self.session.nonce
155
            self._connection._security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
156 1
157 1
            result = self.session.activate_session(params)
158
159 1
            response = ua.ActivateSessionResponse()
160
            response.Parameters = result
161 1
162 1
            self.logger.info("sending read response")
163
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
164 1
165 1
        elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
166
            self.logger.info("Read request")
167 1
            params = ua.ReadParameters.from_binary(body)
168 1
169 1
            results = self.session.read(params)
170
171 1
            response = ua.ReadResponse()
172
            response.Results = results
173 1
174 1
            self.logger.info("sending read response")
175
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
176 1
177 1
        elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
178
            self.logger.info("Write request")
179 1
            params = ua.WriteParameters.from_binary(body)
180 1
181 1
            results = self.session.write(params)
182
183 1
            response = ua.WriteResponse()
184
            response.Results = results
185 1
186 1
            self.logger.info("sending write response")
187
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
188 1
189 1
        elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
190
            self.logger.info("Browse request")
191 1
            params = ua.BrowseParameters.from_binary(body)
192 1
193 1
            results = self.session.browse(params)
194
195 1
            response = ua.BrowseResponse()
196
            response.Results = results
197 1
198 1
            self.logger.info("sending browse response")
199
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
200 1
201 1
        elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
202
            self.logger.info("get endpoints request")
203 1
            params = ua.GetEndpointsParameters.from_binary(body)
204 1
205 1
            endpoints = self.iserver.get_endpoints(params, sockname=self.sockname)
206
207 1
            response = ua.GetEndpointsResponse()
208
            response.Endpoints = endpoints
209 1
210 1
            self.logger.info("sending get endpoints response")
211
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
212 1
213 1
        elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
214
            self.logger.info("find servers request")
215 1
            params = ua.FindServersParameters.from_binary(body)
216 1
217 1
            servers = self.iserver.find_servers(params)
218
219 1
            response = ua.FindServersResponse()
220
            response.Servers = servers
221 1
222 1
            self.logger.info("sending find servers response")
223
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
224 1
225 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
226
            self.logger.info("register server request")
227 1
            serv = ua.RegisteredServer.from_binary(body)
228 1
229 1
            self.iserver.register_server(serv)
230
231 1
            response = ua.RegisterServerResponse()
232
233 1
            self.logger.info("sending register server response")
234
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
235 1
236 1
        elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
237
            self.logger.info("register server 2 request")
238 1
            params = ua.RegisterServer2Parameters.from_binary(body)
239
240
            results = self.iserver.register_server2(params)
241
242
            response = ua.RegisterServer2Response()
243
            response.ConfigurationResults = results
244
245
            self.logger.info("sending register server 2 response")
246
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
247
248
        elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
249
            self.logger.info("translate browsepaths to nodeids request")
250 1
            params = ua.TranslateBrowsePathsToNodeIdsParameters.from_binary(body)
251 1
252 1
            paths = self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
253
254 1
            response = ua.TranslateBrowsePathsToNodeIdsResponse()
255
            response.Results = paths
256 1
257 1
            self.logger.info("sending translate browsepaths to nodeids response")
258
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
259 1
260 1
        elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
261
            self.logger.info("add nodes request")
262 1
            params = ua.AddNodesParameters.from_binary(body)
263 1
264 1
            results = self.session.add_nodes(params.NodesToAdd)
265
266 1
            response = ua.AddNodesResponse()
267
            response.Results = results
268 1
269 1
            self.logger.info("sending add node response")
270
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
271 1
272 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
273
            self.logger.info("delete nodes request")
274 1
            params = ua.DeleteNodesParameters.from_binary(body)
275 1
276 1
            results = self.session.delete_nodes(params.NodesToDelete)
277
278 1
            response = ua.DeleteNodesResponse()
279
            response.Results = results
280 1
281 1
            self.logger.info("sending delete node response")
282
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
283 1
284 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
285
            self.logger.info("create subscription request")
286 1
            params = ua.CreateSubscriptionParameters.from_binary(body)
287 1
288 1
            result = self.session.create_subscription(params, self.forward_publish_response)
289
290 1
            response = ua.CreateSubscriptionResponse()
291
            response.Parameters = result
292 1
293 1
            self.logger.info("sending create subscription response")
294
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
295 1
296 1
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
297
            self.logger.info("delete subscriptions request")
298 1
            params = ua.DeleteSubscriptionsParameters.from_binary(body)
299 1
300 1
            results = self.session.delete_subscriptions(params.SubscriptionIds)
301
302 1
            response = ua.DeleteSubscriptionsResponse()
303
            response.Results = results
304 1
305 1
            self.logger.info("sending delte subscription response")
306
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
307 1
308 1
        elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
309
            self.logger.info("create monitored items request")
310 1
            params = ua.CreateMonitoredItemsParameters.from_binary(body)
311 1
            results = self.session.create_monitored_items(params)
312 1
313 1
            response = ua.CreateMonitoredItemsResponse()
314
            response.Results = results
315 1
316 1
            self.logger.info("sending create monitored items response")
317
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
318 1
319 1
        elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
320
            self.logger.info("modify monitored items request")
321 1
            params = ua.ModifyMonitoredItemsParameters.from_binary(body)
322
            results = self.session.modify_monitored_items(params)
323
324
            response = ua.ModifyMonitoredItemsResponse()
325
            response.Results = results
326
327
            self.logger.info("sending modify monitored items response")
328
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
329
330
        elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
331
            self.logger.info("delete monitored items request")
332 1
            params = ua.DeleteMonitoredItemsParameters.from_binary(body)
333 1
334 1
            results = self.session.delete_monitored_items(params)
335
336 1
            response = ua.DeleteMonitoredItemsResponse()
337
            response.Results = results
338 1
339 1
            self.logger.info("sending delete monitored items response")
340
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
341 1
342 1
        elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
343
            self.logger.info("history read request")
344 1
            params = ua.HistoryReadParameters.from_binary(body)
345
346
            results = self.session.history_read(params)
347
348
            response = ua.HistoryReadResponse()
349
            response.Results = results
350
351
            self.logger.info("sending history read response")
352
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
353
354
        elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
355
            self.logger.info("publish request")
356 1
357 1
            if not self.session:
358
                return False
359 1
360
            params = ua.PublishParameters.from_binary(body)
361
362 1
            data = PublishRequestData()
363
            data.requesthdr = requesthdr
364 1
            data.seqhdr = seqhdr
365 1
            data.algohdr = algohdr
366 1
            with self._datalock:
367 1
                self._publishdata_queue.append(data)  # will be used to send publish answers from server
368 1
                if self._publish_result_queue:
369 1
                    result = self._publish_result_queue.pop(0)
370 1
                    self.forward_publish_response(result)
371
            self.session.publish(params.SubscriptionAcknowledgements)
372
            self.logger.info("publish forward to server")
373 1
374 1
        elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
375
            self.logger.info("re-publish request")
376 1
377
            params = ua.RepublishParameters.from_binary(body)
378
            msg = self.session.republish(params)
379
380
            response = ua.RepublishResponse()
381
            response.NotificationMessage = msg
382
383
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
384
385
        elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
386
            self.logger.info("close secure channel request")
387 1
            self._connection.close()
388
            response = ua.CloseSecureChannelResponse()
389
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
390
            return False
391
392
        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
393
            self.logger.info("call request")
394 1
395 1
            params = ua.CallParameters.from_binary(body)
396
397 1
            results = self.session.call(params.MethodsToCall)
398
399 1
            response = ua.CallResponse()
400
            response.Results = results
401 1
402 1
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
403
404 1
        else:
405
            self.logger.warning("Unknown message received %s", typeid)
406
            raise utils.ServiceError(ua.StatusCodes.BadNotImplemented)
407 1
408 1
        return True
409
410 1
    def close(self):
411
        """
412 1
        to be called when client has disconnected to ensure we really close
413 1
        everything we should
414 1
        """
415 1
        print("Cleanup client connection: ", self.name)
416 1
        if self.session:
417
            self.session.close_session(True)
418