Completed
Pull Request — master (#490)
by Olivier
05:51
created

MessageChunk.encrypted_size()   A

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0932

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 2
c 1
b 1
f 0
dl 0
loc 7
ccs 5
cts 7
cp 0.7143
crap 2.0932
rs 9.4285
1 1
import hashlib
2 1
from datetime import datetime
3 1
import logging
4
5 1
from opcua.ua.ua_binary import struct_from_binary, struct_to_binary, header_from_binary, header_to_binary
6 1
from opcua import ua
7
8
9 1
logger = logging.getLogger('opcua.uaprotocol')
10
11
12 1
class MessageChunk(ua.FrozenClass):
13
    """
14
    Message Chunk, as described in OPC UA specs Part 6, 6.7.2.
15
    """
16
17 1
    def __init__(self, security_policy, body=b'', msg_type=ua.MessageType.SecureMessage, chunk_type=ua.ChunkType.Single):
18 1
        self.MessageHeader = ua.Header(msg_type, chunk_type)
19 1
        if msg_type in (ua.MessageType.SecureMessage, ua.MessageType.SecureClose):
20 1
            self.SecurityHeader = ua.SymmetricAlgorithmHeader()
21 1
        elif msg_type == ua.MessageType.SecureOpen:
22 1
            self.SecurityHeader = ua.AsymmetricAlgorithmHeader()
23
        else:
24
            raise ua.UaError("Unsupported message type: {0}".format(msg_type))
25 1
        self.SequenceHeader = ua.SequenceHeader()
26 1
        self.Body = body
27 1
        self.security_policy = security_policy
28
29 1
    @staticmethod
30
    def from_binary(security_policy, data):
31 1
        h = header_from_binary(data)
32 1
        return MessageChunk.from_header_and_body(security_policy, h, data)
33
34 1
    @staticmethod
35
    def from_header_and_body(security_policy, header, buf):
36 1
        assert len(buf) >= header.body_size, 'Full body expected here'
37 1
        data = buf.copy(header.body_size)
38 1
        buf.skip(header.body_size)
39 1
        if header.MessageType in (ua.MessageType.SecureMessage, ua.MessageType.SecureClose):
40 1
            security_header = struct_from_binary(ua.SymmetricAlgorithmHeader, data)
41 1
            crypto = security_policy.symmetric_cryptography
42 1
        elif header.MessageType == ua.MessageType.SecureOpen:
43 1
            security_header = struct_from_binary(ua.AsymmetricAlgorithmHeader, data)
44 1
            crypto = security_policy.asymmetric_cryptography
45
        else:
46
            raise ua.UaError("Unsupported message type: {0}".format(header.MessageType))
47 1
        obj = MessageChunk(crypto)
48 1
        obj.MessageHeader = header
49 1
        obj.SecurityHeader = security_header
50 1
        decrypted = crypto.decrypt(data.read(len(data)))
51 1
        signature_size = crypto.vsignature_size()
52 1
        if signature_size > 0:
53 1
            signature = decrypted[-signature_size:]
54 1
            decrypted = decrypted[:-signature_size]
55 1
            crypto.verify(header_to_binary(obj.MessageHeader) + struct_to_binary(obj.SecurityHeader) + decrypted, signature)
56 1
        data = ua.utils.Buffer(crypto.remove_padding(decrypted))
57 1
        obj.SequenceHeader = struct_from_binary(ua.SequenceHeader, data)
58 1
        obj.Body = data.read(len(data))
59 1
        return obj
60
61 1
    def encrypted_size(self, plain_size):
62 1
        size = plain_size + self.security_policy.signature_size()
63 1
        pbs = self.security_policy.plain_block_size()
64 1
        if size % pbs != 0:
65
            print("ENC", plain_size, size, pbs)
66
            raise ua.UaError("Encryption error")
67 1
        return size // pbs * self.security_policy.encrypted_block_size()
68
69 1
    def to_binary(self):
70 1
        security = struct_to_binary(self.SecurityHeader)
71 1
        encrypted_part = struct_to_binary(self.SequenceHeader) + self.Body
72 1
        encrypted_part += self.security_policy.padding(len(encrypted_part))
73 1
        self.MessageHeader.body_size = len(security) + self.encrypted_size(len(encrypted_part))
74 1
        header = header_to_binary(self.MessageHeader)
75 1
        encrypted_part += self.security_policy.signature(header + security + encrypted_part)
76 1
        return header + security + self.security_policy.encrypt(encrypted_part)
77
78 1
    @staticmethod
79
    def max_body_size(crypto, max_chunk_size):
80 1
        max_encrypted_size = max_chunk_size - ua.Header.max_size() - ua.SymmetricAlgorithmHeader.max_size()
81 1
        max_plain_size = (max_encrypted_size // crypto.encrypted_block_size()) * crypto.plain_block_size()
82 1
        return max_plain_size - ua.SequenceHeader.max_size() - crypto.signature_size() - crypto.min_padding_size()
83
84 1
    @staticmethod
85 1
    def message_to_chunks(security_policy, body, max_chunk_size,
86
                          message_type=ua.MessageType.SecureMessage, channel_id=1, request_id=1, token_id=1):
87
        """
88
        Pack message body (as binary string) into one or more chunks.
89
        Size of each chunk will not exceed max_chunk_size.
90
        Returns a list of MessageChunks. SequenceNumber is not initialized here,
91
        it must be set by Secure Channel driver.
92
        """
93 1
        if message_type == ua.MessageType.SecureOpen:
94
            # SecureOpen message must be in a single chunk (specs, Part 6, 6.7.2)
95 1
            chunk = MessageChunk(security_policy.asymmetric_cryptography, body, message_type, ua.ChunkType.Single)
96 1
            chunk.SecurityHeader.SecurityPolicyURI = security_policy.URI
97 1
            if security_policy.client_certificate:
98 1
                chunk.SecurityHeader.SenderCertificate = security_policy.client_certificate
99 1
            if security_policy.server_certificate:
100 1
                chunk.SecurityHeader.ReceiverCertificateThumbPrint =\
101
                    hashlib.sha1(security_policy.server_certificate).digest()
102 1
            chunk.MessageHeader.ChannelId = channel_id
103 1
            chunk.SequenceHeader.RequestId = request_id
104 1
            return [chunk]
105
106 1
        crypto = security_policy.symmetric_cryptography
107 1
        max_size = MessageChunk.max_body_size(crypto, max_chunk_size)
108
109 1
        chunks = []
110 1
        for i in range(0, len(body), max_size):
111 1
            part = body[i:i + max_size]
112 1
            if i + max_size >= len(body):
113 1
                chunk_type = ua.ChunkType.Single
114
            else:
115 1
                chunk_type = ua.ChunkType.Intermediate
116 1
            chunk = MessageChunk(crypto, part, message_type, chunk_type)
117 1
            chunk.SecurityHeader.TokenId = token_id
118 1
            chunk.MessageHeader.ChannelId = channel_id
119 1
            chunk.SequenceHeader.RequestId = request_id
120 1
            chunks.append(chunk)
121 1
        return chunks
122
123 1
    def __str__(self):
124
        return "{0}({1}, {2}, {3}, {4} bytes)".format(self.__class__.__name__,
125
                                                      self.MessageHeader, self.SequenceHeader,
126
                                                      self.SecurityHeader, len(self.Body))
127 1
    __repr__ = __str__
128
129
130 1
class SecureConnection(object):
131
    """
132
    Common logic for client and server
133
    """
134
135 1
    def __init__(self, security_policy):
136 1
        self._sequence_number = 0
137 1
        self._peer_sequence_number = None
138 1
        self._incoming_parts = []
139 1
        self.security_policy = security_policy
140 1
        self._policies = []
141 1
        self.channel = ua.OpenSecureChannelResult()
142 1
        self._old_tokens = []
143 1
        self._open = False
144 1
        self._max_chunk_size = 65536
145
146 1
    def set_channel(self, channel):
147
        """
148
        Called on client side when getting secure channel data from server
149
        """
150 1
        self.channel = channel
151
152 1
    def open(self, params, server):
153
        """
154
        called on server side to open secure channel
155
        """
156 1
        if not self._open or params.RequestType == ua.SecurityTokenRequestType.Issue:
157 1
            self._open = True
158 1
            self.channel = ua.OpenSecureChannelResult()
159 1
            self.channel.SecurityToken.TokenId = 13  # random value
160 1
            self.channel.SecurityToken.ChannelId = server.get_new_channel_id()
161 1
            self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
162
        else:
163
            self._old_tokens.append(self.channel.SecurityToken.TokenId)
164 1
        self.channel.SecurityToken.TokenId += 1
165 1
        self.channel.SecurityToken.CreatedAt = datetime.utcnow()
166 1
        self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
167 1
        self.channel.ServerNonce = ua.utils.create_nonce(self.security_policy.symmetric_key_size)
168 1
        self.security_policy.make_symmetric_key(self.channel.ServerNonce, params.ClientNonce)
169 1
        return self.channel
170
171 1
    def close(self):
172 1
        self._open = False
173
174 1
    def is_open(self):
175
        return self._open
176
177 1
    def set_policy_factories(self, policies):
178
        """
179
        Set a list of available security policies.
180
        Use this in servers with multiple endpoints with different security
181
        """
182 1
        self._policies = policies
183
184 1
    @staticmethod
185 1
    def _policy_matches(policy, uri, mode=None):
186
        return policy.URI == uri and (mode is None or policy.Mode == mode)
187
188 1
    def select_policy(self, uri, peer_certificate, mode=None):
189 1
        for policy in self._policies:
190 1
            if policy.matches(uri, mode):
191 1
                self.security_policy = policy.create(peer_certificate)
192 1
                return
193 1
        if self.security_policy.URI != uri or (mode is not None and
194
                                                self.security_policy.Mode != mode):
195
            raise ua.UaError("No matching policy: {0}, {1}".format(uri, mode))
196
197
198 1
    def message_to_binary(self, message, message_type=ua.MessageType.SecureMessage, request_id=0, algohdr=None):
199
        """
200
        Convert OPC UA secure message to binary.
201
        The only supported types are SecureOpen, SecureMessage, SecureClose
202
        if message_type is SecureMessage, the AlgoritmHeader should be passed as arg
203
        """
204 1
        if algohdr is None:
205 1
            token_id = self.channel.SecurityToken.TokenId
206
        else:
207 1
            token_id = algohdr.TokenId
208 1
        chunks = MessageChunk.message_to_chunks(
209
            self.security_policy, message, self._max_chunk_size,
210
            message_type=message_type,
211
            channel_id=self.channel.SecurityToken.ChannelId,
212
            request_id=request_id,
213
            token_id=token_id)
214 1
        for chunk in chunks:
215 1
            self._sequence_number += 1
216 1
            if self._sequence_number >= (1 << 32):
217
                logger.debug("Wrapping sequence number: %d -> 1", self._sequence_number)
218
                self._sequence_number = 1
219 1
            chunk.SequenceHeader.SequenceNumber = self._sequence_number
220 1
        return b"".join([chunk.to_binary() for chunk in chunks])
221
222
223 1
    def _check_incoming_chunk(self, chunk):
224 1
        assert isinstance(chunk, MessageChunk), "Expected chunk, got: {0}".format(chunk)
225 1
        if chunk.MessageHeader.MessageType != ua.MessageType.SecureOpen:
226 1
            if chunk.MessageHeader.ChannelId != self.channel.SecurityToken.ChannelId:
227
                raise ua.UaError("Wrong channel id {0}, expected {1}".format(
228
                    chunk.MessageHeader.ChannelId,
229
                    self.channel.SecurityToken.ChannelId))
230 1
            if chunk.SecurityHeader.TokenId != self.channel.SecurityToken.TokenId:
231
                if chunk.SecurityHeader.TokenId not in self._old_tokens:
232
                    logger.warning("Received a chunk with wrong token id %s, expected %s", chunk.SecurityHeader.TokenId, self.channel.SecurityToken.TokenId)
233
234
                    #raise UaError("Wrong token id {}, expected {}, old tokens are {}".format(
235
                        #chunk.SecurityHeader.TokenId,
236
                        #self.channel.SecurityToken.TokenId,
237
                        #self._old_tokens))
238
239
                else:
240
                    # Do some cleanup, spec says we can remove old tokens when new one are used
241
                    idx = self._old_tokens.index(chunk.SecurityHeader.TokenId)
242
                    if idx != 0:
243
                        self._old_tokens = self._old_tokens[idx:]
244 1
        if self._incoming_parts:
245
            if self._incoming_parts[0].SequenceHeader.RequestId != chunk.SequenceHeader.RequestId:
246
                raise ua.UaError("Wrong request id {0}, expected {1}".format(
247
                    chunk.SequenceHeader.RequestId,
248
                    self._incoming_parts[0].SequenceHeader.RequestId))
249
250
        # sequence number must be incremented or wrapped
251 1
        num = chunk.SequenceHeader.SequenceNumber
252 1
        if self._peer_sequence_number is not None:
253 1
            if num != self._peer_sequence_number + 1:
254
                wrap = (1 << 32) - 1024
255
                if num < 1024 and self._peer_sequence_number >= wrap:
256
                    # specs Part 6, 6.7.2
257
                    logger.debug("Sequence number wrapped: %d -> %d",
258
                                 self._peer_sequence_number, num)
259
                else:
260
                    raise ua.UaError(
261
                        "Wrong sequence {0} -> {1} (server bug or replay attack)"
262
                        .format(self._peer_sequence_number, num))
263 1
        self._peer_sequence_number = num
264
265 1
    def receive_from_header_and_body(self, header, body):
266
        """
267
        Convert MessageHeader and binary body to OPC UA TCP message (see OPC UA
268
        specs Part 6, 7.1: Hello, Acknowledge or ErrorMessage), or a Message
269
        object, or None (if intermediate chunk is received)
270
        """
271 1
        if header.MessageType == ua.MessageType.SecureOpen:
272 1
            data = body.copy(header.body_size)
273 1
            security_header = struct_from_binary(ua.AsymmetricAlgorithmHeader, data)
274 1
            self.select_policy(security_header.SecurityPolicyURI, security_header.SenderCertificate)
275
276 1
        if header.MessageType in (ua.MessageType.SecureMessage,
277
                                  ua.MessageType.SecureOpen,
278
                                  ua.MessageType.SecureClose):
279 1
            chunk = MessageChunk.from_header_and_body(self.security_policy,
280
                                                      header, body)
281 1
            return self._receive(chunk)
282 1
        elif header.MessageType == ua.MessageType.Hello:
283 1
            msg = struct_from_binary(ua.Hello, body)
284 1
            self._max_chunk_size = msg.ReceiveBufferSize
285 1
            return msg
286 1
        elif header.MessageType == ua.MessageType.Acknowledge:
287 1
            msg = struct_from_binary(ua.Acknowledge, body)
288 1
            self._max_chunk_size = msg.SendBufferSize
289 1
            return msg
290
        elif header.MessageType == ua.MessageType.Error:
291
            msg = struct_from_binary(ua.ErrorMessage, body)
292
            logger.warning("Received an error: %s", msg)
293
            return msg
294
        else:
295
            raise ua.UaError("Unsupported message type {0}".format(header.MessageType))
296
297 1
    def receive_from_socket(self, socket):
298
        """
299
        Convert binary stream to OPC UA TCP message (see OPC UA
300
        specs Part 6, 7.1: Hello, Acknowledge or ErrorMessage), or a Message
301
        object, or None (if intermediate chunk is received)
302
        """
303 1
        logger.debug("Waiting for header")
304 1
        header = header_from_binary(socket)
305 1
        logger.info("received header: %s", header)
306 1
        body = socket.read(header.body_size)
307 1
        if len(body) != header.body_size:
308
            raise ua.UaError("{0} bytes expected, {1} available".format(header.body_size, len(body)))
309 1
        return self.receive_from_header_and_body(header, ua.utils.Buffer(body))
310
311 1
    def _receive(self, msg):
312 1
        self._check_incoming_chunk(msg)
313 1
        self._incoming_parts.append(msg)
314 1
        if msg.MessageHeader.ChunkType == ua.ChunkType.Intermediate:
315
            return None
316 1
        if msg.MessageHeader.ChunkType == ua.ChunkType.Abort:
317
            err = struct_from_binary(ua.ErrorMessage, ua.utils.Buffer(msg.Body))
318
            logger.warning("Message %s aborted: %s", msg, err)
319
            # specs Part 6, 6.7.3 say that aborted message shall be ignored
320
            # and SecureChannel should not be closed
321
            self._incoming_parts = []
322
            return None
323 1
        elif msg.MessageHeader.ChunkType == ua.ChunkType.Single:
324 1
            message = ua.Message(self._incoming_parts)
325 1
            self._incoming_parts = []
326 1
            return message
327
        else:
328
            raise ua.UaError("Unsupported chunk type: {0}".format(msg))
329
330
331